diff --git a/go.mod b/go.mod index 10f4f130b2..09f88c73ef 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1 github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 - github.com/confluentinc/cmf-sdk-go v0.0.5 + github.com/confluentinc/cmf-sdk-go v0.0.6 github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 github.com/confluentinc/go-editor v0.11.0 github.com/confluentinc/go-prompt v0.2.40 diff --git a/go.sum b/go.sum index 857ba97598..880d4e9c59 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI= github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI= github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q= -github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE= -github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg= +github.com/confluentinc/cmf-sdk-go v0.0.6 h1:3BFyPOJb4xBAvBMU1hXSh9+2kn/U2zr4EKDoWf8QM74= +github.com/confluentinc/cmf-sdk-go v0.0.6/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg= github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 h1:y9wh3z7FdqN3RJ9IHW12hzytJx4KjlpviPWn4ncA5u0= github.com/confluentinc/confluent-kafka-go/v2 v2.13.0/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI= github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU= diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index 339e571485..2b9944c277 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -60,6 +60,21 @@ func (c *command) validComputePoolArgs(cmd *cobra.Command, args []string) []stri return c.autocompleteComputePools(cmd, args) } +// extractComputePoolPhase extracts the phase string from the generic status map. +// ComputePool.Status is typed as *map[string]map[string]interface{} in SDK v0.0.6. +func extractComputePoolPhase(pool cmfsdk.ComputePool) string { + if pool.Status == nil { + return "" + } + status := pool.GetStatus() + if phaseMap, ok := status["phase"]; ok { + if value, ok := phaseMap["value"].(string); ok { + return value + } + } + return "" +} + func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool) LocalComputePool { localPool := LocalComputePool{ ApiVersion: sdkComputePool.ApiVersion, @@ -77,9 +92,9 @@ func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool) }, } - if sdkComputePool.Status != nil { + if phase := extractComputePoolPhase(sdkComputePool); phase != "" { localPool.Status = &LocalComputePoolStatus{ - Phase: sdkComputePool.Status.Phase, + Phase: phase, } } diff --git a/internal/flink/command_compute_pool_create_onprem.go b/internal/flink/command_compute_pool_create_onprem.go index d483594c45..743cbac506 100644 --- a/internal/flink/command_compute_pool_create_onprem.go +++ b/internal/flink/command_compute_pool_create_onprem.go @@ -92,7 +92,7 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err CreationTime: creationTime, Name: sdkComputePool.GetMetadata().Name, Type: sdkComputePool.GetSpec().Type, - Phase: sdkOutputComputePool.GetStatus().Phase, + Phase: extractComputePoolPhase(sdkOutputComputePool), }) return table.Print() } diff --git a/internal/flink/command_compute_pool_describe_onprem.go b/internal/flink/command_compute_pool_describe_onprem.go index 408345ae83..15d1596328 100644 --- a/internal/flink/command_compute_pool_describe_onprem.go +++ b/internal/flink/command_compute_pool_describe_onprem.go @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e CreationTime: creationTime, Name: sdkComputePool.GetMetadata().Name, Type: sdkComputePool.GetSpec().Type, - Phase: sdkComputePool.GetStatus().Phase, + Phase: extractComputePoolPhase(sdkComputePool), }) return table.Print() } diff --git a/internal/flink/command_compute_pool_list_onprem.go b/internal/flink/command_compute_pool_list_onprem.go index 609dfe6cfe..c7a9ff4f71 100644 --- a/internal/flink/command_compute_pool_list_onprem.go +++ b/internal/flink/command_compute_pool_list_onprem.go @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error { CreationTime: creationTime, Name: pool.GetMetadata().Name, Type: pool.GetSpec().Type, - Phase: pool.GetStatus().Phase, + Phase: extractComputePoolPhase(pool), }) } return list.Print() diff --git a/internal/flink/command_environment_create.go b/internal/flink/command_environment_create.go index bc1fccfe59..f3d72e1ba7 100644 --- a/internal/flink/command_environment_create.go +++ b/internal/flink/command_environment_create.go @@ -93,7 +93,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error { } var postEnvironment cmfsdk.PostEnvironment - postEnvironment.Name = environmentName + postEnvironment.SetName(environmentName) postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed postEnvironment.KubernetesNamespace = &kubernetesNamespace postEnvironment.StatementDefaults = &defaultsStatementParsed diff --git a/internal/flink/command_environment_update.go b/internal/flink/command_environment_update.go index 008dad94fb..ee9cf049a1 100644 --- a/internal/flink/command_environment_update.go +++ b/internal/flink/command_environment_update.go @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error { } var postEnvironment cmfsdk.PostEnvironment - postEnvironment.Name = environmentName + postEnvironment.SetName(environmentName) postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed postEnvironment.StatementDefaults = &defaultsStatementParsed postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..52d3f4db3d 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -1,7 +1,9 @@ package flink import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -231,7 +233,10 @@ func (cmfClient *CmfRestClient) UpdateApplication(ctx context.Context, environme // CreateEnvironment Create an environment. // Internally, since the call for Create and Update is the same, we check if the environment exists before creation. func (cmfClient *CmfRestClient) CreateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) { - environmentName := postEnvironment.Name + environmentName := postEnvironment.GetName() + if environmentName == "" { + return cmfsdk.Environment{}, fmt.Errorf("environment name is required") + } _, httpResponse, _ := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute() // check if the environment exists by checking the status code if httpResponse != nil && httpResponse.StatusCode == http.StatusOK { @@ -281,10 +286,13 @@ func (cmfClient *CmfRestClient) ListEnvironments(ctx context.Context) ([]cmfsdk. return environments, nil } -// UpdateEnvironment Create an environment. -// Internally, since the call for Create and Update is the same, we check if the environment exists before updation. +// UpdateEnvironment updates an environment. +// Internally, since the call for Create and Update is the same, we check if the environment exists before updating. func (cmfClient *CmfRestClient) UpdateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) { - environmentName := postEnvironment.Name + environmentName := postEnvironment.GetName() + if environmentName == "" { + return cmfsdk.Environment{}, fmt.Errorf("environment name is required") + } _, httpResponse, err := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute() // check if the environment exists by checking the status code if httpResponse != nil && httpResponse.StatusCode == http.StatusNotFound { @@ -414,6 +422,9 @@ func (cmfClient *CmfRestClient) CreateComputePool(ctx context.Context, environme return cmfsdk.ComputePool{}, fmt.Errorf("compute pool name is required") } outputComputePool, httpResponse, err := cmfClient.SQLApi.CreateComputePool(ctx, environment).ComputePool(computePool).Execute() + if pool, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePool); ok { + return pool, nil + } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return cmfsdk.ComputePool{}, fmt.Errorf(`failed to create compute pool "%s" in the environment "%s": %s`, computePoolName, environment, parsedErr) } @@ -427,6 +438,9 @@ func (cmfClient *CmfRestClient) DeleteComputePool(ctx context.Context, environme func (cmfClient *CmfRestClient) DescribeComputePool(ctx context.Context, environment, computePool string) (cmfsdk.ComputePool, error) { cmfComputePool, httpResponse, err := cmfClient.SQLApi.GetComputePool(ctx, environment, computePool).Execute() + if pool, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePool); ok { + return pool, nil + } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return cmfsdk.ComputePool{}, fmt.Errorf(`failed to describe compute pool "%s" in the environment "%s": %s`, computePool, environment, parsedErr) } @@ -442,6 +456,11 @@ func (cmfClient *CmfRestClient) ListComputePools(ctx context.Context, environmen for !done { computePoolsPage, httpResponse, err := cmfClient.SQLApi.GetComputePools(ctx, environment).Page(currentPageNumber).Size(pageSize).Execute() + if parsed, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePoolsPage); ok { + computePools = append(computePools, parsed...) + currentPageNumber, done = extractPageOptions(len(parsed), currentPageNumber) + continue + } if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { return nil, fmt.Errorf(`failed to list compute pools in the environment "%s": %s`, environment, parsedErr) } @@ -570,6 +589,81 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +// --- ComputePool deserialization workaround (SDK v0.0.6) --- +// +// The SDK types ComputePool.Status as *map[string]map[string]interface{} but the +// API returns a flat object like {"phase":"RUNNING","message":null}.These helpers +// re-parse the buffered response body with correct handling. +func unmarshalComputePool(data []byte) (cmfsdk.ComputePool, error) { + var wrapper struct { + cmfsdk.ComputePool + Status json.RawMessage `json:"status"` + } + if err := json.Unmarshal(data, &wrapper); err != nil { + return cmfsdk.ComputePool{}, err + } + pool := wrapper.ComputePool + var status struct { + Phase string `json:"phase"` + } + if json.Unmarshal(wrapper.Status, &status) == nil && status.Phase != "" { + s := map[string]map[string]interface{}{ + "phase": {"value": status.Phase}, + } + pool.Status = &s + } + return pool, nil +} + +func unmarshalComputePoolsPage(data []byte) ([]cmfsdk.ComputePool, error) { + var page struct { + Items []json.RawMessage `json:"items"` + } + if err := json.Unmarshal(data, &page); err != nil { + return nil, err + } + pools := make([]cmfsdk.ComputePool, 0, len(page.Items)) + for _, raw := range page.Items { + pool, err := unmarshalComputePool(raw) + if err != nil { + return nil, err + } + pools = append(pools, pool) + } + return pools, nil +} + +// readFallbackBody returns the HTTP response body when the SDK's Execute() returned +// an error despite a successful 2xx status — the ComputePool deserialization bug. +// Returns nil if the condition doesn't apply or the body can't be read. +func readFallbackBody(httpResponse *http.Response, err error) []byte { + if err == nil || httpResponse == nil || httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 { + return nil + } + body, readErr := io.ReadAll(httpResponse.Body) + if readErr != nil { + return nil + } + return body +} + +// tryComputePoolFallback runs the fallback parse for a ComputePool-bearing response. +// Returns (result, true) on success. On failure, re-buffers the response body so +// parseSdkError can still read it, and returns (zero, false). +func tryComputePoolFallback[T any](httpResponse *http.Response, err error, unmarshal func([]byte) (T, error)) (T, bool) { + var zero T + body := readFallbackBody(httpResponse, err) + if body == nil { + return zero, false + } + result, parseErr := unmarshal(body) + if parseErr != nil { + httpResponse.Body = io.NopCloser(bytes.NewBuffer(body)) + return zero, false + } + return result, true +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..bde52e9010 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -185,22 +185,23 @@ func createSavepoint(name string) cmfsdk.Savepoint { } } -func createComputePool(poolName, phase string) cmfsdk.ComputePool { +// createComputePool returns a raw map with a flat status matching the real CMF API response. +// The SDK v0.0.6 cannot deserialize this directly; the CLI's unmarshalComputePool fallback +// in cmf_rest_client.go handles it. +func createComputePool(poolName, phase string) map[string]interface{} { timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() - status := cmfsdk.ComputePoolStatus{ - Phase: phase, - } - - return cmfsdk.ComputePool{ - Metadata: cmfsdk.ComputePoolMetadata{ - Name: poolName, - CreationTimestamp: &timeStamp, + return map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": poolName, + "creationTimestamp": timeStamp, }, - Spec: cmfsdk.ComputePoolSpec{ - Type: "DEDICATED", + "spec": map[string]interface{}{ + "type": "DEDICATED", + }, + "status": map[string]interface{}{ + "phase": phase, }, - Status: &status, } } @@ -364,14 +365,15 @@ func handleCmfEnvironments(t *testing.T) http.HandlerFunc { err = json.Unmarshal(reqBody, &environment) require.NoError(t, err) - if strings.Contains(environment.Name, "failure") { + envName := environment.GetName() + if strings.Contains(envName, "failure") { http.Error(w, "", http.StatusUnprocessableEntity) return } // Already existing environment: update - if environment.Name == "default" || environment.Name == "test" { - outputEnvironment := createEnvironment(environment.Name, environment.Name+"-namespace") + if envName == "default" || envName == "test" { + outputEnvironment := createEnvironment(envName, envName+"-namespace") // This is a dummy update - only the defaults can be updated anyway. outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults @@ -382,7 +384,7 @@ func handleCmfEnvironments(t *testing.T) http.HandlerFunc { } // New environment: create - outputEnvironment := createEnvironment(environment.Name, environment.GetKubernetesNamespace()) + outputEnvironment := createEnvironment(envName, environment.GetKubernetesNamespace()) outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults outputEnvironment.StatementDefaults = environment.StatementDefaults @@ -782,16 +784,17 @@ func handleCmfComputePools(t *testing.T) http.HandlerFunc { switch r.Method { case http.MethodGet: - computePool1 := createComputePool("test-pool1", "RUNNING") - computePool2 := createComputePool("test-pool2", "PENDING") - computePool3 := createComputePool("test-pool3", "COMPLETE") - - computePools := []cmfsdk.ComputePool{computePool1, computePool2, computePool3} - computePoolsPage := cmfsdk.ComputePoolsPage{} + computePoolsPage := map[string]interface{}{ + "items": []interface{}{}, + } page := r.URL.Query().Get("page") if page == "0" { - computePoolsPage.SetItems(computePools) + computePoolsPage["items"] = []interface{}{ + createComputePool("test-pool1", "RUNNING"), + createComputePool("test-pool2", "PENDING"), + createComputePool("test-pool3", "COMPLETE"), + } } err := json.NewEncoder(w).Encode(computePoolsPage)