Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0
github.com/confluentinc/cmf-sdk-go v0.0.5
github.com/confluentinc/cmf-sdk-go v0.0.6
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0
github.com/confluentinc/go-editor v0.11.0
github.com/confluentinc/go-prompt v0.2.40
Expand All @@ -72,6 +72,7 @@ require (
github.com/go-jose/go-jose/v3 v3.0.5
github.com/gobuffalo/flect v1.0.2
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The repo already depends on go.uber.org/mock (and many tests import go.uber.org/mock/gomock), but this PR adds github.com/golang/mock and regenerates several mocks against that package. Mixing these two gomock packages will break compilation because the Controller/Call types are not interchangeable. Please standardize on one gomock implementation (either switch all tests + remaining mocks to github.com/golang/mock/gomock, or revert these regenerated mocks and the new dependency back to go.uber.org/mock/gomock) and regenerate consistently.

Suggested change
github.com/golang/mock v1.6.0

Copilot uses AI. Check for mistakes.
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
Expand Down Expand Up @@ -184,7 +185,6 @@ require (
github.com/gogo/googleapis v1.4.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/cel-go v0.20.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q=
github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE=
github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/cmf-sdk-go v0.0.6 h1:3BFyPOJb4xBAvBMU1hXSh9+2kn/U2zr4EKDoWf8QM74=
github.com/confluentinc/cmf-sdk-go v0.0.6/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 h1:y9wh3z7FdqN3RJ9IHW12hzytJx4KjlpviPWn4ncA5u0=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI=
github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU=
Expand Down
1 change: 1 addition & 0 deletions internal/flink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd.AddCommand(c.newCatalogCommand())
cmd.AddCommand(c.newDetachedSavepointCommand())
cmd.AddCommand(c.newEnvironmentCommand())
cmd.AddCommand(c.newKubernetesClusterCommand())
cmd.AddCommand(c.newSavepointCommand())
Comment on lines 41 to 44
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description's release notes and checklist are still placeholders/unchecked, but this PR adds a new user-facing CLI command group (flink kubernetes-cluster). Please update the PR description to include real release notes entries and complete the checklist (or remove irrelevant sections) so reviewers can assess breaking changes, customer impact, and verification steps.

Copilot uses AI. Check for mistakes.

// On-Prem and Cloud Commands
Expand Down
19 changes: 18 additions & 1 deletion internal/flink/command_compute_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,26 @@ func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool)

if sdkComputePool.Status != nil {
localPool.Status = &LocalComputePoolStatus{
Phase: sdkComputePool.Status.Phase,
Phase: computePoolPhase(sdkComputePool.Status),
}
}

return localPool
}

// computePoolPhase extracts the phase string from ComputePool.Status.
// SDK v0.0.6 incorrectly generates Status as *map[string]map[string]interface{}
// due to additionalProperties: true in the spec; the real API returns a flat
// {"phase": "RUNNING"} object which cannot be decoded into that type.
// Until the SDK is fixed upstream, phase will be empty when calling the real API.
func computePoolPhase(status *map[string]map[string]interface{}) string {
if status == nil {
return ""
}
if phaseMap, ok := (*status)["phase"]; ok {
if val, ok := phaseMap["value"].(string); ok {
return val
}
}
return ""
}
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkOutputComputePool.GetStatus().Phase,
Phase: computePoolPhase(sdkOutputComputePool.Status),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_describe_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkComputePool.GetStatus().Phase,
Phase: computePoolPhase(sdkComputePool.Status),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_list_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error {
CreationTime: creationTime,
Name: pool.GetMetadata().Name,
Type: pool.GetSpec().Type,
Phase: pool.GetStatus().Phase,
Phase: computePoolPhase(pool.Status),
})
}
return list.Print()
Expand Down
2 changes: 2 additions & 0 deletions internal/flink/command_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type flinkEnvironmentOutput struct {
Name string `human:"Name" serialized:"name"`
KubernetesNamespace string `human:"Kubernetes Namespace" serialized:"kubernetes_namespace"`
KubernetesClusterName string `human:"Kubernetes Cluster Name,omitempty" serialized:"kubernetes_cluster_name,omitempty"`
CreatedTime string `human:"Created Time" serialized:"created_time"`
UpdatedTime string `human:"Updated Time" serialized:"updated_time"`
FlinkApplicationDefaults string `human:"Flink Application Defaults,omitempty" serialized:"flink_application_defaults,omitempty"`
Expand Down Expand Up @@ -44,6 +45,7 @@ func convertSdkEnvironmentToLocalEnvironment(sdkOutputEnvironment cmfsdk.Environ
UpdatedTime: sdkOutputEnvironment.UpdatedTime,
FlinkApplicationDefaults: sdkOutputEnvironment.FlinkApplicationDefaults,
KubernetesNamespace: sdkOutputEnvironment.KubernetesNamespace,
KubernetesClusterName: sdkOutputEnvironment.GetKubernetesClusterName(),
ComputePoolDefaults: sdkOutputEnvironment.ComputePoolDefaults,
}

Expand Down
12 changes: 11 additions & 1 deletion internal/flink/command_environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c *command) newEnvironmentCreateCommand() *cobra.Command {
}

cmd.Flags().String("kubernetes-namespace", "", "Kubernetes namespace to deploy Flink applications to.")
cmd.Flags().String("kubernetes-cluster-name", "", "Name of the Kubernetes cluster to bind this environment to.")
cmd.Flags().String("defaults", "", "JSON string defining the environment's Flink application defaults, or path to a file to read defaults from (with .yml, .yaml or .json extension).")
cmd.Flags().String("statement-defaults", "", "JSON string defining the environment's Flink statement defaults, or path to a file to read defaults from (with .yml, .yaml or .json extension).")
cmd.Flags().String("compute-pool-defaults", "", "JSON string defining the environment's Flink compute pool defaults, or path to a file to read defaults from (with .yml, .yaml or .json extension).")
Expand Down Expand Up @@ -92,12 +93,20 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error {
}
}

kubernetesClusterName, err := cmd.Flags().GetString("kubernetes-cluster-name")
if err != nil {
return err
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.KubernetesNamespace = &kubernetesNamespace
postEnvironment.StatementDefaults = &defaultsStatementParsed
postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed
if kubernetesClusterName != "" {
postEnvironment.SetKubernetesClusterName(kubernetesClusterName)
}

sdkOutputEnvironment, err := client.CreateEnvironment(c.createContext(), postEnvironment)
if err != nil {
Expand Down Expand Up @@ -174,6 +183,7 @@ func printEnvironmentOutTable(cmd *cobra.Command, outputEnvironment cmfsdk.Envir
table.Add(&flinkEnvironmentOutput{
Name: outputEnvironment.Name,
KubernetesNamespace: outputEnvironment.KubernetesNamespace,
KubernetesClusterName: outputEnvironment.GetKubernetesClusterName(),
FlinkApplicationDefaults: defaultsApplicationOutput,
ComputePoolDefaults: defaultComputePoolOutput,
DetachedStatementDefaults: defaultsDetachedStatementOutput,
Expand Down
11 changes: 6 additions & 5 deletions internal/flink/command_environment_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ func (c *command) environmentList(cmd *cobra.Command, _ []string) error {

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
list.Filter([]string{"Name", "CreatedTime", "UpdatedTime", "KubernetesNamespace"})
list.Filter([]string{"Name", "CreatedTime", "UpdatedTime", "KubernetesNamespace", "KubernetesClusterName"})
for _, env := range sdkEnvironments {
list.Add(&flinkEnvironmentOutput{
Name: env.Name,
KubernetesNamespace: env.KubernetesNamespace,
CreatedTime: env.CreatedTime.String(),
UpdatedTime: env.UpdatedTime.String(),
Name: env.Name,
KubernetesNamespace: env.KubernetesNamespace,
KubernetesClusterName: env.GetKubernetesClusterName(),
CreatedTime: env.CreatedTime.String(),
UpdatedTime: env.UpdatedTime.String(),
})
}
return list.Print()
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.StatementDefaults = &defaultsStatementParsed
postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed
Expand Down
61 changes: 61 additions & 0 deletions internal/flink/command_kubernetes_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package flink

import (
"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

type kubernetesClusterOutput struct {
Name string `human:"Name" serialized:"name"`
CreatedTime string `human:"Created Time" serialized:"created_time"`
UpdatedTime string `human:"Updated Time" serialized:"updated_time"`
LifecycleState string `human:"Lifecycle State,omitempty" serialized:"lifecycle_state,omitempty"`
ConnectionState string `human:"Connection State,omitempty" serialized:"connection_state,omitempty"`
KubernetesVersion string `human:"Kubernetes Version,omitempty" serialized:"kubernetes_version,omitempty"`
}

func (c *command) newKubernetesClusterCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "kubernetes-cluster",
Short: "Manage Kubernetes clusters registered with CMF.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

cmd.AddCommand(c.newKubernetesClusterListCommand())
cmd.AddCommand(c.newKubernetesClusterDescribeCommand())
cmd.AddCommand(c.newKubernetesClusterUpdateCommand())

return cmd
}

func convertSdkKubernetesClusterToLocal(cluster cmfsdk.KubernetesCluster) LocalKubernetesCluster {
local := LocalKubernetesCluster{
ApiVersion: cluster.ApiVersion,
Kind: cluster.Kind,
Metadata: LocalKubernetesClusterMetadata{
Name: cluster.Metadata.Name,
CreationTimestamp: cluster.Metadata.CreationTimestamp,
UpdateTimestamp: cluster.Metadata.UpdateTimestamp,
Uid: cluster.Metadata.Uid,
Labels: cluster.Metadata.Labels,
Annotations: cluster.Metadata.Annotations,
},
Spec: LocalKubernetesClusterSpec{
LifecycleState: cluster.Spec.LifecycleState,
},
}

if cluster.Status != nil {
local.Status = &LocalKubernetesClusterStatus{
State: cluster.Status.State,
Message: cluster.Status.Message,
LastHeartbeatTimestamp: cluster.Status.LastHeartbeatTimestamp,
KubernetesVersion: cluster.Status.KubernetesVersion,
}
}

return local
}
64 changes: 64 additions & 0 deletions internal/flink/command_kubernetes_cluster_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newKubernetesClusterDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <name>",
Short: "Describe a Kubernetes cluster registered with CMF.",
Args: cobra.ExactArgs(1),
RunE: c.kubernetesClusterDescribe,
}

addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) kubernetesClusterDescribe(cmd *cobra.Command, args []string) error {
client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

clusterName := args[0]
cluster, err := client.DescribeKubernetesCluster(c.createContext(), clusterName)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
o := &kubernetesClusterOutput{
Name: cluster.Metadata.Name,
}
if cluster.Metadata.CreationTimestamp != nil {
o.CreatedTime = *cluster.Metadata.CreationTimestamp
}
if cluster.Metadata.UpdateTimestamp != nil {
o.UpdatedTime = *cluster.Metadata.UpdateTimestamp
}
if cluster.Spec.LifecycleState != nil {
o.LifecycleState = *cluster.Spec.LifecycleState
}
if cluster.Status != nil {
if cluster.Status.State != nil {
o.ConnectionState = *cluster.Status.State
}
if cluster.Status.KubernetesVersion != nil {
o.KubernetesVersion = *cluster.Status.KubernetesVersion
}
}
table.Add(o)
return table.Print()
}

localCluster := convertSdkKubernetesClusterToLocal(cluster)
return output.SerializedOutput(cmd, localCluster)
}
64 changes: 64 additions & 0 deletions internal/flink/command_kubernetes_cluster_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newKubernetesClusterListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Kubernetes clusters registered with CMF.",
Args: cobra.NoArgs,
RunE: c.kubernetesClusterList,
}

addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) kubernetesClusterList(cmd *cobra.Command, _ []string) error {
client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

clusters, err := client.ListKubernetesClusters(c.createContext())
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
list.Filter([]string{"Name", "CreatedTime", "UpdatedTime", "LifecycleState", "ConnectionState"})
for _, cluster := range clusters {
o := &kubernetesClusterOutput{
Name: cluster.Metadata.Name,
}
if cluster.Metadata.CreationTimestamp != nil {
o.CreatedTime = *cluster.Metadata.CreationTimestamp
}
if cluster.Metadata.UpdateTimestamp != nil {
o.UpdatedTime = *cluster.Metadata.UpdateTimestamp
}
if cluster.Spec.LifecycleState != nil {
o.LifecycleState = *cluster.Spec.LifecycleState
}
if cluster.Status != nil && cluster.Status.State != nil {
o.ConnectionState = *cluster.Status.State
}
list.Add(o)
}
return list.Print()
}

localClusters := make([]LocalKubernetesCluster, 0, len(clusters))
for _, cluster := range clusters {
localClusters = append(localClusters, convertSdkKubernetesClusterToLocal(cluster))
}
return output.SerializedOutput(cmd, localClusters)
}
Loading