Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0
github.com/confluentinc/cmf-sdk-go v0.0.5
github.com/confluentinc/cmf-sdk-go v0.0.6
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0
github.com/confluentinc/go-editor v0.11.0
github.com/confluentinc/go-prompt v0.2.40
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q=
github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE=
github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/cmf-sdk-go v0.0.6 h1:3BFyPOJb4xBAvBMU1hXSh9+2kn/U2zr4EKDoWf8QM74=
github.com/confluentinc/cmf-sdk-go v0.0.6/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 h1:y9wh3z7FdqN3RJ9IHW12hzytJx4KjlpviPWn4ncA5u0=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI=
github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU=
Expand Down
19 changes: 17 additions & 2 deletions internal/flink/command_compute_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func (c *command) validComputePoolArgs(cmd *cobra.Command, args []string) []stri
return c.autocompleteComputePools(cmd, args)
}

// extractComputePoolPhase extracts the phase string from the generic status map.
// ComputePool.Status is typed as *map[string]map[string]interface{} in SDK v0.0.6.
func extractComputePoolPhase(pool cmfsdk.ComputePool) string {
if pool.Status == nil {
return ""
}
status := pool.GetStatus()
if phaseMap, ok := status["phase"]; ok {
if value, ok := phaseMap["value"].(string); ok {
return value
}
}
return ""
}
Comment on lines +63 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make sense to move this utility into the SDK?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we don't do human code push in cmf-go-sdk, but if we want to : Adds a custom UnmarshalJSON method on the ComputePool type (in a new file v1/model_compute_pool_custom.go ) seems to be the better fix, which completely removes "custom unmarshaling with json.RawMessage embedding" from cli repo

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we don't do human code push in cmf-go-sdk, but if we want to

My only worry about human pushes in the sdk repo is that the code generator is overwriting human written code / utils.
But when we started the go sdk repo, I actually had additional, non generated code for shared utilities in mind.


func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool) LocalComputePool {
localPool := LocalComputePool{
ApiVersion: sdkComputePool.ApiVersion,
Expand All @@ -77,9 +92,9 @@ func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool)
},
}

if sdkComputePool.Status != nil {
if phase := extractComputePoolPhase(sdkComputePool); phase != "" {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always call this method, also when the CLI runs against older CMF versions? Probably yes? because the response has not changed, only the SDK.

localPool.Status = &LocalComputePoolStatus{
Phase: sdkComputePool.Status.Phase,
Phase: phase,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkOutputComputePool.GetStatus().Phase,
Phase: extractComputePoolPhase(sdkOutputComputePool),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_describe_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkComputePool.GetStatus().Phase,
Phase: extractComputePoolPhase(sdkComputePool),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_list_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error {
CreationTime: creationTime,
Name: pool.GetMetadata().Name,
Type: pool.GetSpec().Type,
Phase: pool.GetStatus().Phase,
Phase: extractComputePoolPhase(pool),
})
}
return list.Print()
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.KubernetesNamespace = &kubernetesNamespace
postEnvironment.StatementDefaults = &defaultsStatementParsed
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.StatementDefaults = &defaultsStatementParsed
postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed
Expand Down
102 changes: 98 additions & 4 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package flink

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -231,7 +233,10 @@ func (cmfClient *CmfRestClient) UpdateApplication(ctx context.Context, environme
// CreateEnvironment Create an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before creation.
func (cmfClient *CmfRestClient) CreateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) {
environmentName := postEnvironment.Name
environmentName := postEnvironment.GetName()
if environmentName == "" {
return cmfsdk.Environment{}, fmt.Errorf("environment name is required")
}
_, httpResponse, _ := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute()
// check if the environment exists by checking the status code
if httpResponse != nil && httpResponse.StatusCode == http.StatusOK {
Expand Down Expand Up @@ -281,10 +286,13 @@ func (cmfClient *CmfRestClient) ListEnvironments(ctx context.Context) ([]cmfsdk.
return environments, nil
}

// UpdateEnvironment Create an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before updation.
// UpdateEnvironment updates an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before updating.
func (cmfClient *CmfRestClient) UpdateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) {
environmentName := postEnvironment.Name
environmentName := postEnvironment.GetName()
if environmentName == "" {
return cmfsdk.Environment{}, fmt.Errorf("environment name is required")
}
_, httpResponse, err := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute()
// check if the environment exists by checking the status code
if httpResponse != nil && httpResponse.StatusCode == http.StatusNotFound {
Expand Down Expand Up @@ -414,6 +422,9 @@ func (cmfClient *CmfRestClient) CreateComputePool(ctx context.Context, environme
return cmfsdk.ComputePool{}, fmt.Errorf("compute pool name is required")
}
outputComputePool, httpResponse, err := cmfClient.SQLApi.CreateComputePool(ctx, environment).ComputePool(computePool).Execute()
if pool, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePool); ok {
return pool, nil
}
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return cmfsdk.ComputePool{}, fmt.Errorf(`failed to create compute pool "%s" in the environment "%s": %s`, computePoolName, environment, parsedErr)
}
Expand All @@ -427,6 +438,9 @@ func (cmfClient *CmfRestClient) DeleteComputePool(ctx context.Context, environme

func (cmfClient *CmfRestClient) DescribeComputePool(ctx context.Context, environment, computePool string) (cmfsdk.ComputePool, error) {
cmfComputePool, httpResponse, err := cmfClient.SQLApi.GetComputePool(ctx, environment, computePool).Execute()
if pool, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePool); ok {
return pool, nil
}
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return cmfsdk.ComputePool{}, fmt.Errorf(`failed to describe compute pool "%s" in the environment "%s": %s`, computePool, environment, parsedErr)
}
Expand All @@ -442,6 +456,11 @@ func (cmfClient *CmfRestClient) ListComputePools(ctx context.Context, environmen

for !done {
computePoolsPage, httpResponse, err := cmfClient.SQLApi.GetComputePools(ctx, environment).Page(currentPageNumber).Size(pageSize).Execute()
if parsed, ok := tryComputePoolFallback(httpResponse, err, unmarshalComputePoolsPage); ok {
computePools = append(computePools, parsed...)
currentPageNumber, done = extractPageOptions(len(parsed), currentPageNumber)
continue
}
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return nil, fmt.Errorf(`failed to list compute pools in the environment "%s": %s`, environment, parsedErr)
}
Expand Down Expand Up @@ -570,6 +589,81 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s
return parseSdkError(httpResp, err)
}

// --- ComputePool deserialization workaround (SDK v0.0.6) ---
//
// The SDK types ComputePool.Status as *map[string]map[string]interface{} but the
// API returns a flat object like {"phase":"RUNNING","message":null}.These helpers
// re-parse the buffered response body with correct handling.
func unmarshalComputePool(data []byte) (cmfsdk.ComputePool, error) {
var wrapper struct {
cmfsdk.ComputePool
Status json.RawMessage `json:"status"`
}
if err := json.Unmarshal(data, &wrapper); err != nil {
return cmfsdk.ComputePool{}, err
}
pool := wrapper.ComputePool
var status struct {
Phase string `json:"phase"`
}
if json.Unmarshal(wrapper.Status, &status) == nil && status.Phase != "" {
s := map[string]map[string]interface{}{
"phase": {"value": status.Phase},
}
pool.Status = &s
}
return pool, nil
}

func unmarshalComputePoolsPage(data []byte) ([]cmfsdk.ComputePool, error) {
var page struct {
Items []json.RawMessage `json:"items"`
}
if err := json.Unmarshal(data, &page); err != nil {
return nil, err
}
pools := make([]cmfsdk.ComputePool, 0, len(page.Items))
for _, raw := range page.Items {
pool, err := unmarshalComputePool(raw)
if err != nil {
return nil, err
}
pools = append(pools, pool)
}
return pools, nil
}

// readFallbackBody returns the HTTP response body when the SDK's Execute() returned
// an error despite a successful 2xx status — the ComputePool deserialization bug.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "the ComputePool deserialization bug"? I don't think this is a bug?
It's just that the SDK's interfaces have changed

// Returns nil if the condition doesn't apply or the body can't be read.
func readFallbackBody(httpResponse *http.Response, err error) []byte {
if err == nil || httpResponse == nil || httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 {
return nil
}
body, readErr := io.ReadAll(httpResponse.Body)
if readErr != nil {
return nil
}
return body
}

// tryComputePoolFallback runs the fallback parse for a ComputePool-bearing response.
// Returns (result, true) on success. On failure, re-buffers the response body so
// parseSdkError can still read it, and returns (zero, false).
func tryComputePoolFallback[T any](httpResponse *http.Response, err error, unmarshal func([]byte) (T, error)) (T, bool) {
var zero T
body := readFallbackBody(httpResponse, err)
if body == nil {
return zero, false
}
result, parseErr := unmarshal(body)
if parseErr != nil {
httpResponse.Body = io.NopCloser(bytes.NewBuffer(body))
return zero, false
}
return result, true
}

// Returns the next page number and whether we need to fetch more pages or not.
func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) {
if receivedItemsLength == 0 {
Expand Down
49 changes: 26 additions & 23 deletions test/test-server/flink_onprem_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,23 @@
}
}

func createComputePool(poolName, phase string) cmfsdk.ComputePool {
// createComputePool returns a raw map with a flat status matching the real CMF API response.
// The SDK v0.0.6 cannot deserialize this directly; the CLI's unmarshalComputePool fallback
// in cmf_rest_client.go handles it.
func createComputePool(poolName, phase string) map[string]interface{} {
timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String()

status := cmfsdk.ComputePoolStatus{
Phase: phase,
}

return cmfsdk.ComputePool{
Metadata: cmfsdk.ComputePoolMetadata{
Name: poolName,
CreationTimestamp: &timeStamp,
return map[string]interface{}{
"metadata": map[string]interface{}{
"name": poolName,
"creationTimestamp": timeStamp,
},
Spec: cmfsdk.ComputePoolSpec{
Type: "DEDICATED",
"spec": map[string]interface{}{
"type": "DEDICATED",
},
"status": map[string]interface{}{
"phase": phase,
},
Status: &status,
}
}

Expand Down Expand Up @@ -364,14 +365,15 @@
err = json.Unmarshal(reqBody, &environment)
require.NoError(t, err)

if strings.Contains(environment.Name, "failure") {
envName := environment.GetName()
if strings.Contains(envName, "failure") {
http.Error(w, "", http.StatusUnprocessableEntity)
return
}

// Already existing environment: update
if environment.Name == "default" || environment.Name == "test" {
outputEnvironment := createEnvironment(environment.Name, environment.Name+"-namespace")
if envName == "default" || envName == "test" {
outputEnvironment := createEnvironment(envName, envName+"-namespace")

Check failure on line 376 in test/test-server/flink_onprem_handler.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "-namespace" 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3320&issues=91ed0128-7c1f-4cc3-89e4-28128ad4dcd5&open=91ed0128-7c1f-4cc3-89e4-28128ad4dcd5
// This is a dummy update - only the defaults can be updated anyway.
outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults
outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults
Expand All @@ -382,7 +384,7 @@
}

// New environment: create
outputEnvironment := createEnvironment(environment.Name, environment.GetKubernetesNamespace())
outputEnvironment := createEnvironment(envName, environment.GetKubernetesNamespace())
outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults
outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults
outputEnvironment.StatementDefaults = environment.StatementDefaults
Expand Down Expand Up @@ -782,16 +784,17 @@

switch r.Method {
case http.MethodGet:
computePool1 := createComputePool("test-pool1", "RUNNING")
computePool2 := createComputePool("test-pool2", "PENDING")
computePool3 := createComputePool("test-pool3", "COMPLETE")

computePools := []cmfsdk.ComputePool{computePool1, computePool2, computePool3}
computePoolsPage := cmfsdk.ComputePoolsPage{}
computePoolsPage := map[string]interface{}{
"items": []interface{}{},
}
page := r.URL.Query().Get("page")

if page == "0" {
computePoolsPage.SetItems(computePools)
computePoolsPage["items"] = []interface{}{
createComputePool("test-pool1", "RUNNING"),
createComputePool("test-pool2", "PENDING"),
createComputePool("test-pool3", "COMPLETE"),
}
}

err := json.NewEncoder(w).Encode(computePoolsPage)
Expand Down