diff --git a/internal/flink/command.go b/internal/flink/command.go index b5cd215b75..8b503aa2c8 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -41,6 +41,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { cmd.AddCommand(c.newDetachedSavepointCommand()) cmd.AddCommand(c.newEnvironmentCommand()) cmd.AddCommand(c.newSavepointCommand()) + cmd.AddCommand(c.newSystemInfoCommand()) // On-Prem and Cloud Commands cmd.AddCommand(c.newComputePoolCommand(cfg)) diff --git a/internal/flink/command_system_info.go b/internal/flink/command_system_info.go new file mode 100644 index 0000000000..9acce7b985 --- /dev/null +++ b/internal/flink/command_system_info.go @@ -0,0 +1,89 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +type systemInfoOut struct { + Version string `human:"Version" serialized:"version"` + Revision string `human:"Revision" serialized:"revision"` +} + +type localSystemInformation struct { + Status *localSystemInformationStatus `json:"status,omitempty" yaml:"status,omitempty"` +} + +type localSystemInformationStatus struct { + Version *string `json:"version,omitempty" yaml:"version,omitempty"` + Revision *string `json:"revision,omitempty" yaml:"revision,omitempty"` +} + +func (c *command) newSystemInfoCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "system-info", + Short: "Display CMF system information.", + Args: cobra.NoArgs, + RunE: c.systemInfo, + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}, + } + + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) systemInfo(cmd *cobra.Command, _ []string) error { + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + result, err := client.GetSystemInformation(c.createContext()) + if err != nil { + return err + } + + sysInfo := parseSystemInformation(result) + + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + table.Add(&systemInfoOut{ + Version: derefString(sysInfo.Status.Version), + Revision: derefString(sysInfo.Status.Revision), + }) + return table.Print() + } + + return output.SerializedOutput(cmd, sysInfo) +} + +func parseSystemInformation(raw map[string]interface{}) localSystemInformation { + sysInfo := localSystemInformation{} + + statusMap, ok := raw["status"].(map[string]interface{}) + if !ok { + return sysInfo + } + + status := &localSystemInformationStatus{} + if v, ok := statusMap["version"].(string); ok { + status.Version = &v + } + if v, ok := statusMap["revision"].(string); ok { + status.Revision = &v + } + sysInfo.Status = status + + return sysInfo +} + +func derefString(s *string) string { + if s == nil { + return "" + } + return *s +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..7e4518b492 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -2,6 +2,7 @@ package flink import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -35,6 +36,7 @@ type CmfClientInterface interface { DeleteStatement(ctx context.Context, environment, statement string) error UpdateStatement(ctx context.Context, environment, statementName string, statement cmfsdk.Statement) error GetStatementResults(ctx context.Context, environment, statementName, pageToken string) (cmfsdk.StatementResult, error) + GetSystemInformation(ctx context.Context) (map[string]interface{}, error) CmfApiContext() context.Context } @@ -529,6 +531,46 @@ func (cmfClient *CmfRestClient) GetStatementResults(ctx context.Context, environ return resp, nil } +func (cmfClient *CmfRestClient) GetSystemInformation(ctx context.Context) (map[string]interface{}, error) { + baseURL := cmfClient.GetConfig().Servers[0].URL + url := baseURL + "/cmf/api/v1/system-information" + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create system information request: %s", err) + } + + if token, ok := ctx.Value(cmfsdk.ContextAccessToken).(string); ok && token != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + } + + resp, err := cmfClient.GetConfig().HTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to get system information: %s", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read system information response: %s", err) + } + + if resp.StatusCode != http.StatusOK { + trimmed := strings.TrimSpace(string(body)) + if trimmed != "" { + return nil, errors.New(trimmed) + } + return nil, errors.New(resp.Status) + } + + var result map[string]interface{} + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to parse system information response: %s", err) + } + + return result, nil +} + func (cmfClient *CmfRestClient) CreateCatalog(ctx context.Context, kafkaCatalog cmfsdk.KafkaCatalog) (cmfsdk.KafkaCatalog, error) { catalogName := kafkaCatalog.Metadata.Name outputCatalog, httpResponse, err := cmfClient.SQLApi.CreateKafkaCatalog(ctx).KafkaCatalog(kafkaCatalog).Execute() diff --git a/pkg/flink/test/mock/cmf_client_mock.go b/pkg/flink/test/mock/cmf_client_mock.go index f5cafc4cf7..dfe5fc37b1 100644 --- a/pkg/flink/test/mock/cmf_client_mock.go +++ b/pkg/flink/test/mock/cmf_client_mock.go @@ -114,6 +114,21 @@ func (mr *MockCmfClientInterfaceMockRecorder) GetStatementResults(ctx, environme return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStatementResults", reflect.TypeOf((*MockCmfClientInterface)(nil).GetStatementResults), ctx, environment, statementName, pageToken) } +// GetSystemInformation mocks base method. +func (m *MockCmfClientInterface) GetSystemInformation(ctx context.Context) (map[string]interface{}, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSystemInformation", ctx) + ret0, _ := ret[0].(map[string]interface{}) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSystemInformation indicates an expected call of GetSystemInformation. +func (mr *MockCmfClientInterfaceMockRecorder) GetSystemInformation(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSystemInformation", reflect.TypeOf((*MockCmfClientInterface)(nil).GetSystemInformation), ctx) +} + // ListStatementExceptions mocks base method. func (m *MockCmfClientInterface) ListStatementExceptions(ctx context.Context, environment, statementName string) (v1.StatementExceptionList, error) { m.ctrl.T.Helper() diff --git a/test/fixtures/output/flink/help-onprem.golden b/test/fixtures/output/flink/help-onprem.golden index 7a1902639d..7549e0b46b 100644 --- a/test/fixtures/output/flink/help-onprem.golden +++ b/test/fixtures/output/flink/help-onprem.golden @@ -11,6 +11,7 @@ Available Commands: environment Manage Flink environments. savepoint Manage Flink savepoints. statement Manage Flink SQL statements. + system-info Display CMF system information. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/system-info-help-onprem.golden b/test/fixtures/output/flink/system-info-help-onprem.golden new file mode 100644 index 0000000000..5c46d98638 --- /dev/null +++ b/test/fixtures/output/flink/system-info-help-onprem.golden @@ -0,0 +1,16 @@ +Display CMF system information. + +Usage: + confluent flink system-info [flags] + +Flags: + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/system-info-json.golden b/test/fixtures/output/flink/system-info-json.golden new file mode 100644 index 0000000000..4e152f174b --- /dev/null +++ b/test/fixtures/output/flink/system-info-json.golden @@ -0,0 +1,6 @@ +{ + "status": { + "version": "1.0.0", + "revision": "abc1234def5678" + } +} diff --git a/test/fixtures/output/flink/system-info-yaml.golden b/test/fixtures/output/flink/system-info-yaml.golden new file mode 100644 index 0000000000..581eefa6ff --- /dev/null +++ b/test/fixtures/output/flink/system-info-yaml.golden @@ -0,0 +1,3 @@ +status: + version: 1.0.0 + revision: abc1234def5678 diff --git a/test/fixtures/output/flink/system-info.golden b/test/fixtures/output/flink/system-info.golden new file mode 100644 index 0000000000..0c63eed3a9 --- /dev/null +++ b/test/fixtures/output/flink/system-info.golden @@ -0,0 +1,4 @@ ++----------+----------------+ +| Version | 1.0.0 | +| Revision | abc1234def5678 | ++----------+----------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..fa9b8972e8 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -511,6 +511,16 @@ func (s *CLITestSuite) TestFlinkStatementExceptionListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkSystemInfo() { + tests := []CLITest{ + {args: "flink system-info", fixture: "flink/system-info.golden"}, + {args: "flink system-info --output json", fixture: "flink/system-info-json.golden"}, + {args: "flink system-info --output yaml", fixture: "flink/system-info-yaml.golden"}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkOnPremWithCloudLogin() { test := CLITest{args: "flink environment list --output json", fixture: "flink/environment/list-cloud.golden", login: "cloud", exitCode: 1} s.runIntegrationTest(test) diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..0e12bd5028 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -1259,3 +1259,23 @@ func handleCmfStatementExceptions(t *testing.T) http.HandlerFunc { } } } + +func handleCmfSystemInformation(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + + switch r.Method { + case http.MethodGet: + sysInfo := map[string]interface{}{ + "status": map[string]interface{}{ + "version": "1.0.0", + "revision": "abc1234def5678", + }, + } + err := json.NewEncoder(w).Encode(sysInfo) + require.NoError(t, err) + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index 2267f01d5a..99032e68ee 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -24,6 +24,7 @@ var flinkRoutes = []route{ {"/cmf/api/v1/environments/{envName}/statements/{stmtName}/savepoints/{savepointName}", handleCmfSavepoint}, {"/cmf/api/v1/detached-savepoints", handleCmfDetachedSavepoints}, {"/cmf/api/v1/detached-savepoints/{detachedSavepointName}", handleCmfDetachedSavepoint}, + {"/cmf/api/v1/system-information", handleCmfSystemInformation}, } func NewFlinkOnPremRouter(t *testing.T) *mux.Router {