Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/flink/command_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (c *command) newApplicationCommand() *cobra.Command {
cmd.AddCommand(c.newApplicationCreateCommand())
cmd.AddCommand(c.newApplicationDeleteCommand())
cmd.AddCommand(c.newApplicationDescribeCommand())
cmd.AddCommand(c.newApplicationEventCommand())
cmd.AddCommand(c.newApplicationListCommand())
cmd.AddCommand(c.newApplicationUpdateCommand())
cmd.AddCommand(c.newApplicationWebUiForwardCommand())
Expand Down
48 changes: 48 additions & 0 deletions internal/flink/command_application_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package flink

import (
"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

type flinkApplicationEventOut struct {
Name string `human:"Name" serialized:"name"`
Type string `human:"Type" serialized:"type"`
Timestamp string `human:"Timestamp" serialized:"timestamp"`
Instance string `human:"Instance" serialized:"instance"`
Message string `human:"Message" serialized:"message"`
}

func (c *command) newApplicationEventCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "event",
Short: "Manage Flink application events.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

cmd.AddCommand(c.newApplicationEventListCommand())

return cmd
}

func convertSdkEventToLocalEvent(sdkEvent cmfsdk.FlinkApplicationEvent) LocalFlinkApplicationEvent {
return LocalFlinkApplicationEvent{
ApiVersion: sdkEvent.ApiVersion,
Kind: sdkEvent.Kind,
Metadata: LocalEventMetadata{
Name: sdkEvent.Metadata.Name,
Uid: sdkEvent.Metadata.Uid,
CreationTimestamp: sdkEvent.Metadata.CreationTimestamp,
FlinkApplicationInstance: sdkEvent.Metadata.FlinkApplicationInstance,
Labels: sdkEvent.Metadata.Labels,
Annotations: sdkEvent.Metadata.Annotations,
},
Status: LocalEventStatus{
Message: sdkEvent.Status.Message,
Type: sdkEvent.Status.Type,
},
}
}
70 changes: 70 additions & 0 deletions internal/flink/command_application_event_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newApplicationEventListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Flink application events.",
Args: cobra.NoArgs,
RunE: c.applicationEventList,
}

cmd.Flags().String("environment", "", "Name of the Flink environment.")
cmd.Flags().String("application", "", "Name of the Flink application.")
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("environment"))
cobra.CheckErr(cmd.MarkFlagRequired("application"))

return cmd
}

func (c *command) applicationEventList(cmd *cobra.Command, _ []string) error {
environment, err := cmd.Flags().GetString("environment")
if err != nil {
return err
}

application, err := cmd.Flags().GetString("application")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

events, err := client.ListApplicationEvents(c.createContext(), environment, application)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, event := range events {
list.Add(&flinkApplicationEventOut{
Name: event.Metadata.GetName(),
Type: event.Status.GetType(),
Timestamp: event.Metadata.GetCreationTimestamp(),
Instance: event.Metadata.GetFlinkApplicationInstance(),
Message: event.Status.GetMessage(),
})
}
return list.Print()
}

localEvents := make([]LocalFlinkApplicationEvent, 0, len(events))
for _, sdkEvent := range events {
localEvents = append(localEvents, convertSdkEventToLocalEvent(sdkEvent))
}

return output.SerializedOutput(cmd, localEvents)
}
24 changes: 24 additions & 0 deletions internal/flink/local_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,30 @@ type LocalFlinkApplication struct {
Status *map[string]interface{} `json:"status,omitempty" yaml:"status,omitempty"`
}

type LocalFlinkApplicationEvent struct {
ApiVersion string `json:"apiVersion" yaml:"apiVersion"`
Kind string `json:"kind" yaml:"kind"`
Metadata LocalEventMetadata `json:"metadata" yaml:"metadata"`
Status LocalEventStatus `json:"status" yaml:"status"`
}

type LocalEventMetadata struct {
Name *string `json:"name,omitempty" yaml:"name,omitempty"`
Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"`
CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"`
FlinkApplicationInstance *string `json:"flinkApplicationInstance,omitempty" yaml:"flinkApplicationInstance,omitempty"`
Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"`
}

// LocalEventStatus maps the SDK EventStatus but intentionally omits the Data field.
// Data is a oneOf union type (EventDataJobException | EventDataNewStatus) that does not
// map cleanly to a simple local struct and is not needed for the CLI list output.
type LocalEventStatus struct {
Message *string `json:"message,omitempty" yaml:"message,omitempty"`
Type *string `json:"type,omitempty" yaml:"type,omitempty"`
}

type LocalKafkaCatalog struct {
ApiVersion string `json:"apiVersion" yaml:"apiVersion"`
Kind string `json:"kind" yaml:"kind"`
Expand Down
18 changes: 18 additions & 0 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ func (cmfClient *CmfRestClient) UpdateApplication(ctx context.Context, environme
return outputApplication, nil
}

func (cmfClient *CmfRestClient) ListApplicationEvents(ctx context.Context, environment, application string) ([]cmfsdk.FlinkApplicationEvent, error) {
events := make([]cmfsdk.FlinkApplicationEvent, 0)
var currentPageNumber int32 = 0
const pageSize = 100
done := false

for !done {
eventsPage, httpResponse, err := cmfClient.FlinkApplicationsApi.GetApplicationEvents(ctx, environment, application).Page(currentPageNumber).Size(pageSize).Execute()
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return nil, fmt.Errorf(`failed to list events for application "%s" in the environment "%s": %s`, application, environment, parsedErr)
}
events = append(events, eventsPage.GetItems()...)
currentPageNumber, done = extractPageOptions(len(eventsPage.GetItems()), currentPageNumber)
}

return events, nil
}

// CreateEnvironment Create an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before creation.
func (cmfClient *CmfRestClient) CreateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Error: required flag(s) "application" not set
Usage:
confluent flink application event list [flags]

Flags:
--environment string REQUIRED: Name of the Flink environment.
--application string REQUIRED: Name of the Flink application.
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
None found.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Name | Type | Timestamp | Instance | Message
------------+---------+----------------------+----------------------------------+---------------------------------
event-001 | Normal | 2024-01-15T10:30:00Z | default-application-1-instance-1 | Application started
| | | | successfully
event-002 | Warning | 2024-01-15T10:31:00Z | default-application-1-instance-1 | Application restarting due to
| | | | failure
28 changes: 28 additions & 0 deletions test/fixtures/output/flink/application/event-list-json.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[
{
"apiVersion": "cmf.confluent.io/v1alpha1",
"kind": "FlinkApplicationEvent",
"metadata": {
"name": "event-001",
"creationTimestamp": "2024-01-15T10:30:00Z",
"flinkApplicationInstance": "default-application-1-instance-1"
},
"status": {
"message": "Application started successfully",
"type": "Normal"
}
},
{
"apiVersion": "cmf.confluent.io/v1alpha1",
"kind": "FlinkApplicationEvent",
"metadata": {
"name": "event-002",
"creationTimestamp": "2024-01-15T10:31:00Z",
"flinkApplicationInstance": "default-application-1-instance-1"
},
"status": {
"message": "Application restarting due to failure",
"type": "Warning"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Error: required flag(s) "environment", "application" not set
Usage:
confluent flink application event list [flags]

Flags:
--environment string REQUIRED: Name of the Flink environment.
--application string REQUIRED: Name of the Flink application.
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: failed to list events for application "non-existent" in the environment "default": Application not found
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: failed to list events for application "some-app" in the environment "non-existent": Environment not found
18 changes: 18 additions & 0 deletions test/fixtures/output/flink/application/event-list-yaml.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- apiVersion: cmf.confluent.io/v1alpha1
kind: FlinkApplicationEvent
metadata:
name: event-001
creationTimestamp: "2024-01-15T10:30:00Z"
flinkApplicationInstance: default-application-1-instance-1
status:
message: Application started successfully
type: Normal
- apiVersion: cmf.confluent.io/v1alpha1
kind: FlinkApplicationEvent
metadata:
name: event-002
creationTimestamp: "2024-01-15T10:31:00Z"
flinkApplicationInstance: default-application-1-instance-1
status:
message: Application restarting due to failure
type: Warning
14 changes: 14 additions & 0 deletions test/fixtures/output/flink/application/event/help-onprem.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Manage Flink application events.

Usage:
confluent flink application event [command]

Available Commands:
list List Flink application events.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Use "confluent flink application event [command] --help" for more information about a command.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
List Flink application events.

Usage:
confluent flink application event list [flags]

Flags:
--environment string REQUIRED: Name of the Flink environment.
--application string REQUIRED: Name of the Flink application.
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Available Commands:
create Create a Flink application.
delete Delete one or more Flink applications.
describe Describe a Flink application.
event Manage Flink application events.
list List Flink applications.
update Update a Flink application.
web-ui-forward Forward the web UI of a Flink application.
Expand Down
17 changes: 17 additions & 0 deletions test/flink_onprem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ func (s *CLITestSuite) TestFlinkApplicationList() {
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationEventList() {
tests := []CLITest{
// failure scenarios
{args: "flink application event list", fixture: "flink/application/event-list-missing-flags.golden", exitCode: 1},
{args: "flink application event list --environment default", fixture: "flink/application/event-list-app-missing.golden", exitCode: 1},
{args: "flink application event list --environment non-existent --application some-app", fixture: "flink/application/event-list-non-existent-env.golden", exitCode: 1},
{args: "flink application event list --environment default --application non-existent", fixture: "flink/application/event-list-non-existent-app.golden", exitCode: 1},
// success scenarios
{args: "flink application event list --environment test --application non-existent", fixture: "flink/application/event-list-empty.golden"},
{args: "flink application event list --environment default --application default-application-1 --output human", fixture: "flink/application/event-list-human.golden"},
{args: "flink application event list --environment default --application default-application-1 --output json", fixture: "flink/application/event-list-json.golden"},
{args: "flink application event list --environment default --application default-application-1 --output yaml", fixture: "flink/application/event-list-yaml.golden"},
}

runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationDelete() {
tests := []CLITest{
// failure scenarios
Expand Down
Loading