From 1c59e6e39a5225fdd8013e1ee293f9c6951e5e8f Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 14 Apr 2026 21:16:42 +0530 Subject: [PATCH 1/6] fix(deps): bump cmf-sdk-go v0.0.5 to v0.0.6 and fix breaking changes ComputePool.Status changed from *ComputePoolStatus to *map[string]map[string]interface{} and PostEnvironment.Name changed from string to *string. This updates all call sites to use SDK getters/setters, adds extractComputePoolPhase() helper for the generic status map, adds empty-name guards to environment create/update, and updates test handler and golden files accordingly. Co-Authored-By: Claude Opus 4.6 (1M context) --- go.mod | 2 +- go.sum | 4 +- internal/flink/command_compute_pool.go | 36 ++++++++++++- .../command_compute_pool_create_onprem.go | 2 +- .../command_compute_pool_describe_onprem.go | 2 +- .../flink/command_compute_pool_list_onprem.go | 2 +- internal/flink/command_environment_create.go | 2 +- internal/flink/command_environment_update.go | 2 +- pkg/flink/cmf_rest_client.go | 14 +++-- .../compute-pool/describe-success-json.golden | 2 +- .../flink/compute-pool/list-json.golden | 6 +-- test/test-server/flink_onprem_handler.go | 52 +++++++++++-------- 12 files changed, 85 insertions(+), 41 deletions(-) diff --git a/go.mod b/go.mod index 10f4f130b2..09f88c73ef 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1 github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 - github.com/confluentinc/cmf-sdk-go v0.0.5 + github.com/confluentinc/cmf-sdk-go v0.0.6 github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 github.com/confluentinc/go-editor v0.11.0 github.com/confluentinc/go-prompt v0.2.40 diff --git a/go.sum b/go.sum index 857ba97598..880d4e9c59 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI= github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI= github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q= -github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE= -github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg= +github.com/confluentinc/cmf-sdk-go v0.0.6 h1:3BFyPOJb4xBAvBMU1hXSh9+2kn/U2zr4EKDoWf8QM74= +github.com/confluentinc/cmf-sdk-go v0.0.6/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg= github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 h1:y9wh3z7FdqN3RJ9IHW12hzytJx4KjlpviPWn4ncA5u0= github.com/confluentinc/confluent-kafka-go/v2 v2.13.0/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI= github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU= diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index 339e571485..4009ba1477 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -1,11 +1,14 @@ package flink import ( + "encoding/json" + "github.com/spf13/cobra" cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" "github.com/confluentinc/cli/v4/pkg/config" + "github.com/confluentinc/cli/v4/pkg/output" ) type computePoolOut struct { @@ -60,6 +63,35 @@ func (c *command) validComputePoolArgs(cmd *cobra.Command, args []string) []stri return c.autocompleteComputePools(cmd, args) } +// extractComputePoolPhase extracts the phase string from the generic status map. +// ComputePool.Status changed from *ComputePoolStatus to *map[string]map[string]interface{} in SDK v0.0.6. +// The status is a nested map where phase is at status["phase"]["value"]. +func extractComputePoolPhase(pool cmfsdk.ComputePool) string { + if pool.Status == nil { + return "" + } + status := pool.GetStatus() + if phaseMap, ok := status["phase"]; ok { + if value, ok := phaseMap["value"].(string); ok { + return value + } + } + // Fallback: try re-parsing as a simpler type in case the API shape varies. + raw, err := json.Marshal(pool.Status) + if err != nil { + output.ErrPrintf(false, "Warning: failed to marshal compute pool status: %v\n", err) + return "" + } + var flat map[string]string + if err := json.Unmarshal(raw, &flat); err == nil { + if phase, ok := flat["phase"]; ok { + return phase + } + } + output.ErrPrintf(false, "Warning: compute pool has status but phase could not be extracted\n") + return "" +} + func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool) LocalComputePool { localPool := LocalComputePool{ ApiVersion: sdkComputePool.ApiVersion, @@ -77,9 +109,9 @@ func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool) }, } - if sdkComputePool.Status != nil { + if phase := extractComputePoolPhase(sdkComputePool); phase != "" { localPool.Status = &LocalComputePoolStatus{ - Phase: sdkComputePool.Status.Phase, + Phase: phase, } } diff --git a/internal/flink/command_compute_pool_create_onprem.go b/internal/flink/command_compute_pool_create_onprem.go index d483594c45..743cbac506 100644 --- a/internal/flink/command_compute_pool_create_onprem.go +++ b/internal/flink/command_compute_pool_create_onprem.go @@ -92,7 +92,7 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err CreationTime: creationTime, Name: sdkComputePool.GetMetadata().Name, Type: sdkComputePool.GetSpec().Type, - Phase: sdkOutputComputePool.GetStatus().Phase, + Phase: extractComputePoolPhase(sdkOutputComputePool), }) return table.Print() } diff --git a/internal/flink/command_compute_pool_describe_onprem.go b/internal/flink/command_compute_pool_describe_onprem.go index 408345ae83..15d1596328 100644 --- a/internal/flink/command_compute_pool_describe_onprem.go +++ b/internal/flink/command_compute_pool_describe_onprem.go @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e CreationTime: creationTime, Name: sdkComputePool.GetMetadata().Name, Type: sdkComputePool.GetSpec().Type, - Phase: sdkComputePool.GetStatus().Phase, + Phase: extractComputePoolPhase(sdkComputePool), }) return table.Print() } diff --git a/internal/flink/command_compute_pool_list_onprem.go b/internal/flink/command_compute_pool_list_onprem.go index 609dfe6cfe..c7a9ff4f71 100644 --- a/internal/flink/command_compute_pool_list_onprem.go +++ b/internal/flink/command_compute_pool_list_onprem.go @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error { CreationTime: creationTime, Name: pool.GetMetadata().Name, Type: pool.GetSpec().Type, - Phase: pool.GetStatus().Phase, + Phase: extractComputePoolPhase(pool), }) } return list.Print() diff --git a/internal/flink/command_environment_create.go b/internal/flink/command_environment_create.go index bc1fccfe59..f3d72e1ba7 100644 --- a/internal/flink/command_environment_create.go +++ b/internal/flink/command_environment_create.go @@ -93,7 +93,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error { } var postEnvironment cmfsdk.PostEnvironment - postEnvironment.Name = environmentName + postEnvironment.SetName(environmentName) postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed postEnvironment.KubernetesNamespace = &kubernetesNamespace postEnvironment.StatementDefaults = &defaultsStatementParsed diff --git a/internal/flink/command_environment_update.go b/internal/flink/command_environment_update.go index 008dad94fb..ee9cf049a1 100644 --- a/internal/flink/command_environment_update.go +++ b/internal/flink/command_environment_update.go @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error { } var postEnvironment cmfsdk.PostEnvironment - postEnvironment.Name = environmentName + postEnvironment.SetName(environmentName) postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed postEnvironment.StatementDefaults = &defaultsStatementParsed postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..f8163d0905 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -231,7 +231,10 @@ func (cmfClient *CmfRestClient) UpdateApplication(ctx context.Context, environme // CreateEnvironment Create an environment. // Internally, since the call for Create and Update is the same, we check if the environment exists before creation. func (cmfClient *CmfRestClient) CreateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) { - environmentName := postEnvironment.Name + environmentName := postEnvironment.GetName() + if environmentName == "" { + return cmfsdk.Environment{}, fmt.Errorf("environment name is required") + } _, httpResponse, _ := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute() // check if the environment exists by checking the status code if httpResponse != nil && httpResponse.StatusCode == http.StatusOK { @@ -281,10 +284,13 @@ func (cmfClient *CmfRestClient) ListEnvironments(ctx context.Context) ([]cmfsdk. return environments, nil } -// UpdateEnvironment Create an environment. -// Internally, since the call for Create and Update is the same, we check if the environment exists before updation. +// UpdateEnvironment updates an environment. +// Internally, since the call for Create and Update is the same, we check if the environment exists before updating. func (cmfClient *CmfRestClient) UpdateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) { - environmentName := postEnvironment.Name + environmentName := postEnvironment.GetName() + if environmentName == "" { + return cmfsdk.Environment{}, fmt.Errorf("environment name is required") + } _, httpResponse, err := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute() // check if the environment exists by checking the status code if httpResponse != nil && httpResponse.StatusCode == http.StatusNotFound { diff --git a/test/fixtures/output/flink/compute-pool/describe-success-json.golden b/test/fixtures/output/flink/compute-pool/describe-success-json.golden index a27e7d1df6..bc170897db 100644 --- a/test/fixtures/output/flink/compute-pool/describe-success-json.golden +++ b/test/fixtures/output/flink/compute-pool/describe-success-json.golden @@ -7,7 +7,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": null + "clusterSpec": {} }, "status": { "phase": "RUNNING" diff --git a/test/fixtures/output/flink/compute-pool/list-json.golden b/test/fixtures/output/flink/compute-pool/list-json.golden index 57f2a301db..c48d4f7dae 100644 --- a/test/fixtures/output/flink/compute-pool/list-json.golden +++ b/test/fixtures/output/flink/compute-pool/list-json.golden @@ -8,7 +8,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": null + "clusterSpec": {} }, "status": { "phase": "RUNNING" @@ -23,7 +23,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": null + "clusterSpec": {} }, "status": { "phase": "PENDING" @@ -38,7 +38,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": null + "clusterSpec": {} }, "status": { "phase": "COMPLETE" diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..04acd4632f 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -185,22 +185,26 @@ func createSavepoint(name string) cmfsdk.Savepoint { } } -func createComputePool(poolName, phase string) cmfsdk.ComputePool { +// createComputePool returns a raw map instead of cmfsdk.ComputePool because +// in SDK v0.0.6, ComputePool.Status is typed as *map[string]map[string]interface{}. +// The status must use nested maps so the SDK client can deserialize the response. +func createComputePool(poolName, phase string) map[string]interface{} { timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() - status := cmfsdk.ComputePoolStatus{ - Phase: phase, - } - - return cmfsdk.ComputePool{ - Metadata: cmfsdk.ComputePoolMetadata{ - Name: poolName, - CreationTimestamp: &timeStamp, + return map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": poolName, + "creationTimestamp": timeStamp, }, - Spec: cmfsdk.ComputePoolSpec{ - Type: "DEDICATED", + "spec": map[string]interface{}{ + "type": "DEDICATED", + "clusterSpec": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "phase": map[string]interface{}{ + "value": phase, + }, }, - Status: &status, } } @@ -364,14 +368,15 @@ func handleCmfEnvironments(t *testing.T) http.HandlerFunc { err = json.Unmarshal(reqBody, &environment) require.NoError(t, err) - if strings.Contains(environment.Name, "failure") { + envName := environment.GetName() + if strings.Contains(envName, "failure") { http.Error(w, "", http.StatusUnprocessableEntity) return } // Already existing environment: update - if environment.Name == "default" || environment.Name == "test" { - outputEnvironment := createEnvironment(environment.Name, environment.Name+"-namespace") + if envName == "default" || envName == "test" { + outputEnvironment := createEnvironment(envName, envName+"-namespace") // This is a dummy update - only the defaults can be updated anyway. outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults @@ -382,7 +387,7 @@ func handleCmfEnvironments(t *testing.T) http.HandlerFunc { } // New environment: create - outputEnvironment := createEnvironment(environment.Name, environment.GetKubernetesNamespace()) + outputEnvironment := createEnvironment(envName, environment.GetKubernetesNamespace()) outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults outputEnvironment.StatementDefaults = environment.StatementDefaults @@ -782,16 +787,17 @@ func handleCmfComputePools(t *testing.T) http.HandlerFunc { switch r.Method { case http.MethodGet: - computePool1 := createComputePool("test-pool1", "RUNNING") - computePool2 := createComputePool("test-pool2", "PENDING") - computePool3 := createComputePool("test-pool3", "COMPLETE") - - computePools := []cmfsdk.ComputePool{computePool1, computePool2, computePool3} - computePoolsPage := cmfsdk.ComputePoolsPage{} + computePoolsPage := map[string]interface{}{ + "items": []interface{}{}, + } page := r.URL.Query().Get("page") if page == "0" { - computePoolsPage.SetItems(computePools) + computePoolsPage["items"] = []interface{}{ + createComputePool("test-pool1", "RUNNING"), + createComputePool("test-pool2", "PENDING"), + createComputePool("test-pool3", "COMPLETE"), + } } err := json.NewEncoder(w).Encode(computePoolsPage) From adf79631bc1017716714e5983e6bad47c4efcdf9 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 14 Apr 2026 22:51:18 +0530 Subject: [PATCH 2/6] refactor: remove dead fallback code from extractComputePoolPhase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The JSON marshal/unmarshal fallback path can never succeed given the SDK type map[string]map[string]interface{} — marshaling always produces nested objects which cannot unmarshal into map[string]string. Remove the unreachable code and stderr warnings to keep the function side-effect free. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/flink/command_compute_pool.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index 4009ba1477..c3281e2bb8 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -1,14 +1,11 @@ package flink import ( - "encoding/json" - "github.com/spf13/cobra" cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" "github.com/confluentinc/cli/v4/pkg/config" - "github.com/confluentinc/cli/v4/pkg/output" ) type computePoolOut struct { @@ -76,19 +73,6 @@ func extractComputePoolPhase(pool cmfsdk.ComputePool) string { return value } } - // Fallback: try re-parsing as a simpler type in case the API shape varies. - raw, err := json.Marshal(pool.Status) - if err != nil { - output.ErrPrintf(false, "Warning: failed to marshal compute pool status: %v\n", err) - return "" - } - var flat map[string]string - if err := json.Unmarshal(raw, &flat); err == nil { - if phase, ok := flat["phase"]; ok { - return phase - } - } - output.ErrPrintf(false, "Warning: compute pool has status but phase could not be extracted\n") return "" } From d18db0491e2d16d37583810960a1c4dad17137f6 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Thu, 16 Apr 2026 16:58:04 +0530 Subject: [PATCH 3/6] fix: add fallback parsing for ComputePool.Status deserialization failure SDK v0.0.6 types ComputePool.Status as *map[string]map[string]interface{} but the API returns a flat object like {"phase":"RUNNING"}, causing the SDK's Execute() to fail on valid 200 responses. Add fallback helpers that re-parse the buffered response body when this occurs. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/flink/command_compute_pool.go | 5 +- pkg/flink/cmf_rest_client.go | 79 ++++++++++++++++++++++++ test/test-server/flink_onprem_handler.go | 10 ++- 3 files changed, 86 insertions(+), 8 deletions(-) diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index c3281e2bb8..2984d71a2b 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -61,8 +61,9 @@ func (c *command) validComputePoolArgs(cmd *cobra.Command, args []string) []stri } // extractComputePoolPhase extracts the phase string from the generic status map. -// ComputePool.Status changed from *ComputePoolStatus to *map[string]map[string]interface{} in SDK v0.0.6. -// The status is a nested map where phase is at status["phase"]["value"]. +// ComputePool.Status is typed as *map[string]map[string]interface{} in SDK v0.0.6. +// The unmarshalComputePool workaround in cmf_rest_client.go encodes the flat API +// status into this nested format, placing the phase at status["phase"]["value"]. func extractComputePoolPhase(pool cmfsdk.ComputePool) string { if pool.Status == nil { return "" diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index f8163d0905..414b9e29b1 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -1,7 +1,9 @@ package flink import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -420,6 +422,15 @@ func (cmfClient *CmfRestClient) CreateComputePool(ctx context.Context, environme return cmfsdk.ComputePool{}, fmt.Errorf("compute pool name is required") } outputComputePool, httpResponse, err := cmfClient.SQLApi.CreateComputePool(ctx, environment).ComputePool(computePool).Execute() + // Fallback: SDK v0.0.6 cannot deserialize ComputePool.Status (see unmarshalComputePool). + if err != nil && httpResponse != nil && httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300 { + if body, readErr := io.ReadAll(httpResponse.Body); readErr == nil { + if pool, parseErr := unmarshalComputePool(body); parseErr == nil { + return pool, nil + } + httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) + } + } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return cmfsdk.ComputePool{}, fmt.Errorf(`failed to create compute pool "%s" in the environment "%s": %s`, computePoolName, environment, parsedErr) } @@ -433,6 +444,15 @@ func (cmfClient *CmfRestClient) DeleteComputePool(ctx context.Context, environme func (cmfClient *CmfRestClient) DescribeComputePool(ctx context.Context, environment, computePool string) (cmfsdk.ComputePool, error) { cmfComputePool, httpResponse, err := cmfClient.SQLApi.GetComputePool(ctx, environment, computePool).Execute() + // Fallback: SDK v0.0.6 cannot deserialize ComputePool.Status (see unmarshalComputePool). + if err != nil && httpResponse != nil && httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300 { + if body, readErr := io.ReadAll(httpResponse.Body); readErr == nil { + if pool, parseErr := unmarshalComputePool(body); parseErr == nil { + return pool, nil + } + httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) + } + } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return cmfsdk.ComputePool{}, fmt.Errorf(`failed to describe compute pool "%s" in the environment "%s": %s`, computePool, environment, parsedErr) } @@ -448,6 +468,17 @@ func (cmfClient *CmfRestClient) ListComputePools(ctx context.Context, environmen for !done { computePoolsPage, httpResponse, err := cmfClient.SQLApi.GetComputePools(ctx, environment).Page(currentPageNumber).Size(pageSize).Execute() + // Fallback: SDK v0.0.6 cannot deserialize ComputePool.Status (see unmarshalComputePool). + if err != nil && httpResponse != nil && httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300 { + if body, readErr := io.ReadAll(httpResponse.Body); readErr == nil { + if parsed, parseErr := unmarshalComputePoolsPage(body); parseErr == nil { + computePools = append(computePools, parsed...) + currentPageNumber, done = extractPageOptions(len(parsed), currentPageNumber) + continue + } + httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) + } + } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return nil, fmt.Errorf(`failed to list compute pools in the environment "%s": %s`, environment, parsedErr) } @@ -576,6 +607,54 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +// --- ComputePool deserialization workaround (SDK v0.0.6) --- +// +// The SDK types ComputePool.Status as *map[string]map[string]interface{} but the +// API returns a flat object like {"phase":"RUNNING","message":null}. Go's JSON +// unmarshaler cannot decode a string ("RUNNING") into the expected inner type +// map[string]interface{}, so Execute() fails on valid 200 responses. These helpers +// re-parse the buffered response body with correct handling. +// Remove when the SDK ships a properly typed ComputePoolStatus struct. + +func unmarshalComputePool(data []byte) (cmfsdk.ComputePool, error) { + var wrapper struct { + cmfsdk.ComputePool + Status json.RawMessage `json:"status"` + } + if err := json.Unmarshal(data, &wrapper); err != nil { + return cmfsdk.ComputePool{}, err + } + pool := wrapper.ComputePool + var status struct { + Phase string `json:"phase"` + } + if json.Unmarshal(wrapper.Status, &status) == nil && status.Phase != "" { + s := map[string]map[string]interface{}{ + "phase": {"value": status.Phase}, + } + pool.Status = &s + } + return pool, nil +} + +func unmarshalComputePoolsPage(data []byte) ([]cmfsdk.ComputePool, error) { + var page struct { + Items []json.RawMessage `json:"items"` + } + if err := json.Unmarshal(data, &page); err != nil { + return nil, err + } + pools := make([]cmfsdk.ComputePool, 0, len(page.Items)) + for _, raw := range page.Items { + pool, err := unmarshalComputePool(raw) + if err != nil { + return nil, err + } + pools = append(pools, pool) + } + return pools, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 04acd4632f..e5a848c3ca 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -185,9 +185,9 @@ func createSavepoint(name string) cmfsdk.Savepoint { } } -// createComputePool returns a raw map instead of cmfsdk.ComputePool because -// in SDK v0.0.6, ComputePool.Status is typed as *map[string]map[string]interface{}. -// The status must use nested maps so the SDK client can deserialize the response. +// createComputePool returns a raw map with a flat status matching the real CMF API response. +// The SDK v0.0.6 cannot deserialize this directly; the CLI's unmarshalComputePool fallback +// in cmf_rest_client.go handles it. func createComputePool(poolName, phase string) map[string]interface{} { timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() @@ -201,9 +201,7 @@ func createComputePool(poolName, phase string) map[string]interface{} { "clusterSpec": map[string]interface{}{}, }, "status": map[string]interface{}{ - "phase": map[string]interface{}{ - "value": phase, - }, + "phase": phase, }, } } From 2704dd1e5f54979f3bdb65ae1af400199eab1422 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Thu, 16 Apr 2026 17:27:35 +0530 Subject: [PATCH 4/6] Fix comments --- internal/flink/command_compute_pool.go | 2 -- pkg/flink/cmf_rest_client.go | 6 +----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index 2984d71a2b..2b9944c277 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -62,8 +62,6 @@ func (c *command) validComputePoolArgs(cmd *cobra.Command, args []string) []stri // extractComputePoolPhase extracts the phase string from the generic status map. // ComputePool.Status is typed as *map[string]map[string]interface{} in SDK v0.0.6. -// The unmarshalComputePool workaround in cmf_rest_client.go encodes the flat API -// status into this nested format, placing the phase at status["phase"]["value"]. func extractComputePoolPhase(pool cmfsdk.ComputePool) string { if pool.Status == nil { return "" diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 414b9e29b1..c2f01d456c 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -610,12 +610,8 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s // --- ComputePool deserialization workaround (SDK v0.0.6) --- // // The SDK types ComputePool.Status as *map[string]map[string]interface{} but the -// API returns a flat object like {"phase":"RUNNING","message":null}. Go's JSON -// unmarshaler cannot decode a string ("RUNNING") into the expected inner type -// map[string]interface{}, so Execute() fails on valid 200 responses. These helpers +// API returns a flat object like {"phase":"RUNNING","message":null}.These helpers // re-parse the buffered response body with correct handling. -// Remove when the SDK ships a properly typed ComputePoolStatus struct. - func unmarshalComputePool(data []byte) (cmfsdk.ComputePool, error) { var wrapper struct { cmfsdk.ComputePool From d16c1b8aece4fa309127710eceb52e55848703d4 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 20 Apr 2026 09:09:28 +0530 Subject: [PATCH 5/6] test: revert golden file changes by dropping clusterSpec from mock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit rewrote createComputePool to a raw map and explicitly set clusterSpec to an empty map, which serialized as {} in golden JSON outputs (vs the prior null). Dropping the clusterSpec key from the mock leaves ComputePool.Spec.ClusterSpec at its zero value (nil map), which serializes as null — matching the original golden files. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../output/flink/compute-pool/describe-success-json.golden | 2 +- test/fixtures/output/flink/compute-pool/list-json.golden | 6 +++--- test/test-server/flink_onprem_handler.go | 3 +-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/test/fixtures/output/flink/compute-pool/describe-success-json.golden b/test/fixtures/output/flink/compute-pool/describe-success-json.golden index bc170897db..a27e7d1df6 100644 --- a/test/fixtures/output/flink/compute-pool/describe-success-json.golden +++ b/test/fixtures/output/flink/compute-pool/describe-success-json.golden @@ -7,7 +7,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": {} + "clusterSpec": null }, "status": { "phase": "RUNNING" diff --git a/test/fixtures/output/flink/compute-pool/list-json.golden b/test/fixtures/output/flink/compute-pool/list-json.golden index c48d4f7dae..57f2a301db 100644 --- a/test/fixtures/output/flink/compute-pool/list-json.golden +++ b/test/fixtures/output/flink/compute-pool/list-json.golden @@ -8,7 +8,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": {} + "clusterSpec": null }, "status": { "phase": "RUNNING" @@ -23,7 +23,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": {} + "clusterSpec": null }, "status": { "phase": "PENDING" @@ -38,7 +38,7 @@ }, "spec": { "type": "DEDICATED", - "clusterSpec": {} + "clusterSpec": null }, "status": { "phase": "COMPLETE" diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index e5a848c3ca..bde52e9010 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -197,8 +197,7 @@ func createComputePool(poolName, phase string) map[string]interface{} { "creationTimestamp": timeStamp, }, "spec": map[string]interface{}{ - "type": "DEDICATED", - "clusterSpec": map[string]interface{}{}, + "type": "DEDICATED", }, "status": map[string]interface{}{ "phase": phase, From a182d1c7a4b9c9517dfd690e33cb6b66ac4c81c5 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 20 Apr 2026 09:22:18 +0530 Subject: [PATCH 6/6] refactor: deduplicate ComputePool fallback blocks into a generic helper The fallback pattern (re-read buffered response body, attempt manual parse, re-buffer on failure) was duplicated across CreateComputePool, DescribeComputePool, and ListComputePools. Extract readFallbackBody and a generic tryComputePoolFallback helper so each call site collapses to a three-line check, and the failure/re-buffer discipline lives in one place. Co-Authored-By: Claude Opus 4.6 (1M context) --- pkg/flink/cmf_rest_client.go | 65 +++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index c2f01d456c..52d3f4db3d 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -422,14 +422,8 @@ func (cmfClient *CmfRestClient) CreateComputePool(ctx context.Context, environme return cmfsdk.ComputePool{}, fmt.Errorf("compute pool name is required") } outputComputePool, httpResponse, err := cmfClient.SQLApi.CreateComputePool(ctx, environment).ComputePool(computePool).Execute() - // Fallback: SDK v0.0.6 cannot deserialize ComputePool.Status (see unmarshalComputePool). - if err != nil && httpResponse != nil && httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300 { - if body, readErr := io.ReadAll(httpResponse.Body); readErr == nil { - if pool, parseErr := unmarshalComputePool(body); parseErr == nil { - return pool, nil - } - httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) - } + if pool, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePool); ok { + return pool, nil } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return cmfsdk.ComputePool{}, fmt.Errorf(`failed to create compute pool "%s" in the environment "%s": %s`, computePoolName, environment, parsedErr) @@ -444,14 +438,8 @@ func (cmfClient *CmfRestClient) DeleteComputePool(ctx context.Context, environme func (cmfClient *CmfRestClient) DescribeComputePool(ctx context.Context, environment, computePool string) (cmfsdk.ComputePool, error) { cmfComputePool, httpResponse, err := cmfClient.SQLApi.GetComputePool(ctx, environment, computePool).Execute() - // Fallback: SDK v0.0.6 cannot deserialize ComputePool.Status (see unmarshalComputePool). - if err != nil && httpResponse != nil && httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300 { - if body, readErr := io.ReadAll(httpResponse.Body); readErr == nil { - if pool, parseErr := unmarshalComputePool(body); parseErr == nil { - return pool, nil - } - httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) - } + if pool, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePool); ok { + return pool, nil } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return cmfsdk.ComputePool{}, fmt.Errorf(`failed to describe compute pool "%s" in the environment "%s": %s`, computePool, environment, parsedErr) @@ -468,16 +456,10 @@ func (cmfClient *CmfRestClient) ListComputePools(ctx context.Context, environmen for !done { computePoolsPage, httpResponse, err := cmfClient.SQLApi.GetComputePools(ctx, environment).Page(currentPageNumber).Size(pageSize).Execute() - // Fallback: SDK v0.0.6 cannot deserialize ComputePool.Status (see unmarshalComputePool). - if err != nil && httpResponse != nil && httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300 { - if body, readErr := io.ReadAll(httpResponse.Body); readErr == nil { - if parsed, parseErr := unmarshalComputePoolsPage(body); parseErr == nil { - computePools = append(computePools, parsed...) - currentPageNumber, done = extractPageOptions(len(parsed), currentPageNumber) - continue - } - httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) - } + if parsed, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePoolsPage); ok { + computePools = append(computePools, parsed...) + currentPageNumber, done = extractPageOptions(len(parsed), currentPageNumber) + continue } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return nil, fmt.Errorf(`failed to list compute pools in the environment "%s": %s`, environment, parsedErr) @@ -651,6 +633,37 @@ func unmarshalComputePoolsPage(data []byte) ([]cmfsdk.ComputePool, error) { return pools, nil } +// readFallbackBody returns the HTTP response body when the SDK's Execute() returned +// an error despite a successful 2xx status — the ComputePool deserialization bug. +// Returns nil if the condition doesn't apply or the body can't be read. +func readFallbackBody(httpResponse *http.Response, err error) []byte { + if err == nil || httpResponse == nil || httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 { + return nil + } + body, readErr := io.ReadAll(httpResponse.Body) + if readErr != nil { + return nil + } + return body +} + +// tryComputePoolFallback runs the fallback parse for a ComputePool-bearing response. +// Returns (result, true) on success. On failure, re-buffers the response body so +// parseSdkError can still read it, and returns (zero, false). +func tryComputePoolFallback[T any](httpResponse *http.Response, err error, unmarshal func([]byte) (T, error)) (T, bool) { + var zero T + body := readFallbackBody(httpResponse, err) + if body == nil { + return zero, false + } + result, parseErr := unmarshal(body) + if parseErr != nil { + httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) + return zero, false + } + return result, true +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 {