From c7b24ac097a1cd26598554c6b8955739d985cc3f Mon Sep 17 00:00:00 2001 From: machichima Date: Thu, 23 Apr 2026 16:41:26 +0800 Subject: [PATCH 1/6] feat: add app conditions model and migration sql Signed-off-by: machichima --- app/internal/migrations/migrations.go | 18 ++++++++++++++++++ .../sql/20260423000000_app_conditions.sql | 13 +++++++++++++ .../sql/20260423000000_app_conditions_down.sql | 1 + .../repository/models/app_conditions.go | 12 ++++++++++++ 4 files changed, 44 insertions(+) create mode 100644 app/internal/migrations/migrations.go create mode 100644 app/internal/migrations/sql/20260423000000_app_conditions.sql create mode 100644 app/internal/migrations/sql/20260423000000_app_conditions_down.sql create mode 100644 app/internal/repository/models/app_conditions.go diff --git a/app/internal/migrations/migrations.go b/app/internal/migrations/migrations.go new file mode 100644 index 00000000000..698d39aab8a --- /dev/null +++ b/app/internal/migrations/migrations.go @@ -0,0 +1,18 @@ +package migrations + +import ( + "context" + "embed" + + "github.com/jmoiron/sqlx" + + "github.com/flyteorg/flyte/v2/flytestdlib/database" +) + +//go:embed sql/*.sql +var migrationFS embed.FS + +// RunMigrations applies all pending app migrations. +func RunMigrations(ctx context.Context, db *sqlx.DB) error { + return database.Migrate(ctx, db, "app", migrationFS) +} diff --git a/app/internal/migrations/sql/20260423000000_app_conditions.sql b/app/internal/migrations/sql/20260423000000_app_conditions.sql new file mode 100644 index 00000000000..ddaeac237ab --- /dev/null +++ b/app/internal/migrations/sql/20260423000000_app_conditions.sql @@ -0,0 +1,13 @@ +-- App conditions history table. +-- Stores the append-only condition log for each app. +-- KService CRD remains the source of truth for live status (spec, replicas, ingress). +-- This table only persists the conditions array for historical display. + +CREATE TABLE IF NOT EXISTS app_conditions ( + project TEXT NOT NULL, + domain TEXT NOT NULL, + name TEXT NOT NULL, + conditions BYTEA NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (project, domain, name) +); diff --git a/app/internal/migrations/sql/20260423000000_app_conditions_down.sql b/app/internal/migrations/sql/20260423000000_app_conditions_down.sql new file mode 100644 index 00000000000..816bf6fc8f9 --- /dev/null +++ b/app/internal/migrations/sql/20260423000000_app_conditions_down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS app_conditions; diff --git a/app/internal/repository/models/app_conditions.go b/app/internal/repository/models/app_conditions.go new file mode 100644 index 00000000000..1234fb8938c --- /dev/null +++ b/app/internal/repository/models/app_conditions.go @@ -0,0 +1,12 @@ +package models + +import "time" + +// AppConditions stores the serialized condition history for a single app. +type AppConditions struct { + Project string `db:"project"` + Domain string `db:"domain"` + Name string `db:"name"` + Conditions []byte `db:"conditions"` // proto-serialized flyteidl2.app.ConditionList + UpdatedAt time.Time `db:"updated_at"` +} From 36b12611ca03804e31a68c551fb6db82e155c239 Mon Sep 17 00:00:00 2001 From: machichima Date: Fri, 24 Apr 2026 08:40:03 +0800 Subject: [PATCH 2/6] feat: add app DB implementation Signed-off-by: machichima --- .../repository/impl/app_conditions.go | 133 ++++++++++++++++++ .../repository/interfaces/app_conditions.go | 24 ++++ 2 files changed, 157 insertions(+) create mode 100644 app/internal/repository/impl/app_conditions.go create mode 100644 app/internal/repository/interfaces/app_conditions.go diff --git a/app/internal/repository/impl/app_conditions.go b/app/internal/repository/impl/app_conditions.go new file mode 100644 index 00000000000..c9c5884baa1 --- /dev/null +++ b/app/internal/repository/impl/app_conditions.go @@ -0,0 +1,133 @@ +package impl + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/jmoiron/sqlx" + "google.golang.org/protobuf/proto" + + "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" +) + +var _ interfaces.AppConditionsRepo = (*AppConditionsRepo)(nil) + +type AppConditionsRepo struct { + db *sqlx.DB +} + +func NewAppConditionsRepo(db *sqlx.DB) *AppConditionsRepo { + return &AppConditionsRepo{db: db} +} + +// AppendCondition appends cond to the stored conditions for the given app within a +// single transaction. If the total exceeds maxConditions, the oldest entries are +// trimmed. Creates the row if it does not exist. +func (r *AppConditionsRepo) AppendCondition(ctx context.Context, appID *flyteapp.Identifier, cond *flyteapp.Condition, maxConditions int) error { + tx, err := r.db.BeginTxx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction for app conditions: %w", err) + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + // Read existing conditions inside the transaction. + var raw []byte + err = tx.QueryRowContext(ctx, + "SELECT conditions FROM app_conditions WHERE project = $1 AND domain = $2 AND name = $3 FOR UPDATE", + appID.GetProject(), appID.GetDomain(), appID.GetName(), + ).Scan(&raw) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to read app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + conditions, err := unmarshalConditions(raw) + if err != nil { + return fmt.Errorf("failed to unmarshal app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + // Append and trim. + conditions = append(conditions, cond) + if maxConditions > 0 && len(conditions) > maxConditions { + conditions = conditions[len(conditions)-maxConditions:] + } + + raw, err = marshalConditions(conditions) + if err != nil { + return fmt.Errorf("failed to marshal app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + _, err = tx.ExecContext(ctx, + `INSERT INTO app_conditions (project, domain, name, conditions, updated_at) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (project, domain, name) DO UPDATE SET + conditions = EXCLUDED.conditions, + updated_at = EXCLUDED.updated_at`, + appID.GetProject(), appID.GetDomain(), appID.GetName(), raw, time.Now().UTC(), + ) + if err != nil { + return fmt.Errorf("failed to upsert app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + return tx.Commit() +} + +// GetConditions returns the stored conditions for the given app. +// Returns nil if no conditions have been recorded yet. +func (r *AppConditionsRepo) GetConditions(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Condition, error) { + var raw []byte + err := r.db.QueryRowContext(ctx, + "SELECT conditions FROM app_conditions WHERE project = $1 AND domain = $2 AND name = $3", + appID.GetProject(), appID.GetDomain(), appID.GetName(), + ).Scan(&raw) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + conditions, err := unmarshalConditions(raw) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + return conditions, nil +} + +// DeleteConditions removes the conditions row for the given app. +// No-ops if the row does not exist. +func (r *AppConditionsRepo) DeleteConditions(ctx context.Context, appID *flyteapp.Identifier) error { + _, err := r.db.ExecContext(ctx, + "DELETE FROM app_conditions WHERE project = $1 AND domain = $2 AND name = $3", + appID.GetProject(), appID.GetDomain(), appID.GetName(), + ) + if err != nil { + return fmt.Errorf("failed to delete app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + return nil +} + +// marshalConditions serializes a slice of Condition using Status as a proto wrapper. +func marshalConditions(conditions []*flyteapp.Condition) ([]byte, error) { + return proto.Marshal(&flyteapp.Status{Conditions: conditions}) +} + +// unmarshalConditions deserializes conditions from bytes produced by marshalConditions. +// Returns nil for empty/nil input. +func unmarshalConditions(raw []byte) ([]*flyteapp.Condition, error) { + if len(raw) == 0 { + return nil, nil + } + status := &flyteapp.Status{} + if err := proto.Unmarshal(raw, status); err != nil { + return nil, err + } + return status.GetConditions(), nil +} diff --git a/app/internal/repository/interfaces/app_conditions.go b/app/internal/repository/interfaces/app_conditions.go new file mode 100644 index 00000000000..3be525d9cdd --- /dev/null +++ b/app/internal/repository/interfaces/app_conditions.go @@ -0,0 +1,24 @@ +package interfaces + +import ( + "context" + + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" +) + +// AppConditionsRepo persists and retrieves the condition history for apps. +// Each app has at most one row; conditions are stored as a serialized proto array. +type AppConditionsRepo interface { + // AppendCondition appends cond to the stored conditions for the given app, + // trimming the oldest entries if the total exceeds maxConditions. + // Creates the row if it does not exist. + AppendCondition(ctx context.Context, appID *flyteapp.Identifier, cond *flyteapp.Condition, maxConditions int) error + + // GetConditions returns the stored conditions for the given app. + // Returns nil (not an error) if no conditions have been recorded yet. + GetConditions(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Condition, error) + + // DeleteConditions removes the conditions row for the given app. + // No-ops if the row does not exist. + DeleteConditions(ctx context.Context, appID *flyteapp.Identifier) error +} From 463639e5a1ea6614d2c0dbb48c466496ae1719e8 Mon Sep 17 00:00:00 2001 From: machichima Date: Fri, 24 Apr 2026 09:00:20 +0800 Subject: [PATCH 3/6] feat: call repo impl in service Signed-off-by: machichima --- .mockery.yaml | 4 + app/config/config.go | 10 +- .../repository/impl/app_conditions.go | 3 + app/internal/repository/mocks/mocks.go | 233 ++++++++++++++++++ app/internal/service/internal_app_service.go | 54 +++- app/internal/setup.go | 9 +- 6 files changed, 299 insertions(+), 14 deletions(-) create mode 100644 app/internal/repository/mocks/mocks.go diff --git a/.mockery.yaml b/.mockery.yaml index 96d14d8de22..c84ae1de929 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -103,6 +103,10 @@ packages: dir: actions/service/mocks interfaces: ActionsClientInterface: + github.com/flyteorg/flyte/v2/app/internal/repository/interfaces: + config: + all: true + dir: app/internal/repository/mocks github.com/eko/gocache/lib/v4/cache: config: all: false diff --git a/app/config/config.go b/app/config/config.go index 039ec898c51..406d60eebaa 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -2,7 +2,11 @@ package config import "time" -// AppConfig holds configuration for the control plane AppService. +var defaultConfig = &InternalAppConfig{ + MaxConditions: 40, +} + +// AppConfig holds configuration for the AppService. type AppConfig struct { // InternalAppServiceURL is the base URL of the InternalAppService (data plane). // In unified mode this is overridden by the shared mux BaseURL. @@ -49,4 +53,8 @@ type InternalAppConfig struct { // Use this to inject cluster-internal endpoints (e.g. _U_EP_OVERRIDE) that app // processes need to connect back to the Flyte manager. DefaultEnvVars map[string]string `json:"defaultEnvVars" pflag:"-,Default env vars injected into every app pod"` + + // MaxConditions is the maximum number of conditions to retain per app. + // Oldest entries are trimmed when this limit is exceeded. Defaults to 40. + MaxConditions int `json:"maxConditions" pflag:",Maximum number of conditions to retain per app"` } diff --git a/app/internal/repository/impl/app_conditions.go b/app/internal/repository/impl/app_conditions.go index c9c5884baa1..d2f780ccaab 100644 --- a/app/internal/repository/impl/app_conditions.go +++ b/app/internal/repository/impl/app_conditions.go @@ -11,6 +11,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" ) @@ -56,6 +57,8 @@ func (r *AppConditionsRepo) AppendCondition(ctx context.Context, appID *flyteapp // Append and trim. conditions = append(conditions, cond) if maxConditions > 0 && len(conditions) > maxConditions { + logger.Debugf(ctx, "Trimming conditions for app %s/%s/%s from %d to %d", + appID.GetProject(), appID.GetDomain(), appID.GetName(), len(conditions), maxConditions) conditions = conditions[len(conditions)-maxConditions:] } diff --git a/app/internal/repository/mocks/mocks.go b/app/internal/repository/mocks/mocks.go new file mode 100644 index 00000000000..94a71849644 --- /dev/null +++ b/app/internal/repository/mocks/mocks.go @@ -0,0 +1,233 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package mocks + +import ( + "context" + + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + mock "github.com/stretchr/testify/mock" +) + +// NewAppConditionsRepo creates a new instance of AppConditionsRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAppConditionsRepo(t interface { + mock.TestingT + Cleanup(func()) +}) *AppConditionsRepo { + mock := &AppConditionsRepo{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// AppConditionsRepo is an autogenerated mock type for the AppConditionsRepo type +type AppConditionsRepo struct { + mock.Mock +} + +type AppConditionsRepo_Expecter struct { + mock *mock.Mock +} + +func (_m *AppConditionsRepo) EXPECT() *AppConditionsRepo_Expecter { + return &AppConditionsRepo_Expecter{mock: &_m.Mock} +} + +// AppendCondition provides a mock function for the type AppConditionsRepo +func (_mock *AppConditionsRepo) AppendCondition(ctx context.Context, appID *app.Identifier, cond *app.Condition, maxConditions int) error { + ret := _mock.Called(ctx, appID, cond, maxConditions) + + if len(ret) == 0 { + panic("no return value specified for AppendCondition") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier, *app.Condition, int) error); ok { + r0 = returnFunc(ctx, appID, cond, maxConditions) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// AppConditionsRepo_AppendCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AppendCondition' +type AppConditionsRepo_AppendCondition_Call struct { + *mock.Call +} + +// AppendCondition is a helper method to define mock.On call +// - ctx context.Context +// - appID *app.Identifier +// - cond *app.Condition +// - maxConditions int +func (_e *AppConditionsRepo_Expecter) AppendCondition(ctx interface{}, appID interface{}, cond interface{}, maxConditions interface{}) *AppConditionsRepo_AppendCondition_Call { + return &AppConditionsRepo_AppendCondition_Call{Call: _e.mock.On("AppendCondition", ctx, appID, cond, maxConditions)} +} + +func (_c *AppConditionsRepo_AppendCondition_Call) Run(run func(ctx context.Context, appID *app.Identifier, cond *app.Condition, maxConditions int)) *AppConditionsRepo_AppendCondition_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *app.Identifier + if args[1] != nil { + arg1 = args[1].(*app.Identifier) + } + var arg2 *app.Condition + if args[2] != nil { + arg2 = args[2].(*app.Condition) + } + var arg3 int + if args[3] != nil { + arg3 = args[3].(int) + } + run( + arg0, + arg1, + arg2, + arg3, + ) + }) + return _c +} + +func (_c *AppConditionsRepo_AppendCondition_Call) Return(err error) *AppConditionsRepo_AppendCondition_Call { + _c.Call.Return(err) + return _c +} + +func (_c *AppConditionsRepo_AppendCondition_Call) RunAndReturn(run func(ctx context.Context, appID *app.Identifier, cond *app.Condition, maxConditions int) error) *AppConditionsRepo_AppendCondition_Call { + _c.Call.Return(run) + return _c +} + +// DeleteConditions provides a mock function for the type AppConditionsRepo +func (_mock *AppConditionsRepo) DeleteConditions(ctx context.Context, appID *app.Identifier) error { + ret := _mock.Called(ctx, appID) + + if len(ret) == 0 { + panic("no return value specified for DeleteConditions") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier) error); ok { + r0 = returnFunc(ctx, appID) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// AppConditionsRepo_DeleteConditions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteConditions' +type AppConditionsRepo_DeleteConditions_Call struct { + *mock.Call +} + +// DeleteConditions is a helper method to define mock.On call +// - ctx context.Context +// - appID *app.Identifier +func (_e *AppConditionsRepo_Expecter) DeleteConditions(ctx interface{}, appID interface{}) *AppConditionsRepo_DeleteConditions_Call { + return &AppConditionsRepo_DeleteConditions_Call{Call: _e.mock.On("DeleteConditions", ctx, appID)} +} + +func (_c *AppConditionsRepo_DeleteConditions_Call) Run(run func(ctx context.Context, appID *app.Identifier)) *AppConditionsRepo_DeleteConditions_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *app.Identifier + if args[1] != nil { + arg1 = args[1].(*app.Identifier) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *AppConditionsRepo_DeleteConditions_Call) Return(err error) *AppConditionsRepo_DeleteConditions_Call { + _c.Call.Return(err) + return _c +} + +func (_c *AppConditionsRepo_DeleteConditions_Call) RunAndReturn(run func(ctx context.Context, appID *app.Identifier) error) *AppConditionsRepo_DeleteConditions_Call { + _c.Call.Return(run) + return _c +} + +// GetConditions provides a mock function for the type AppConditionsRepo +func (_mock *AppConditionsRepo) GetConditions(ctx context.Context, appID *app.Identifier) ([]*app.Condition, error) { + ret := _mock.Called(ctx, appID) + + if len(ret) == 0 { + panic("no return value specified for GetConditions") + } + + var r0 []*app.Condition + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier) ([]*app.Condition, error)); ok { + return returnFunc(ctx, appID) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier) []*app.Condition); ok { + r0 = returnFunc(ctx, appID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*app.Condition) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, *app.Identifier) error); ok { + r1 = returnFunc(ctx, appID) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// AppConditionsRepo_GetConditions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetConditions' +type AppConditionsRepo_GetConditions_Call struct { + *mock.Call +} + +// GetConditions is a helper method to define mock.On call +// - ctx context.Context +// - appID *app.Identifier +func (_e *AppConditionsRepo_Expecter) GetConditions(ctx interface{}, appID interface{}) *AppConditionsRepo_GetConditions_Call { + return &AppConditionsRepo_GetConditions_Call{Call: _e.mock.On("GetConditions", ctx, appID)} +} + +func (_c *AppConditionsRepo_GetConditions_Call) Run(run func(ctx context.Context, appID *app.Identifier)) *AppConditionsRepo_GetConditions_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *app.Identifier + if args[1] != nil { + arg1 = args[1].(*app.Identifier) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *AppConditionsRepo_GetConditions_Call) Return(conditions []*app.Condition, err error) *AppConditionsRepo_GetConditions_Call { + _c.Call.Return(conditions, err) + return _c +} + +func (_c *AppConditionsRepo_GetConditions_Call) RunAndReturn(run func(ctx context.Context, appID *app.Identifier) ([]*app.Condition, error)) *AppConditionsRepo_GetConditions_Call { + _c.Call.Return(run) + return _c +} diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go index f013fab381d..e3c406c0713 100644 --- a/app/internal/service/internal_app_service.go +++ b/app/internal/service/internal_app_service.go @@ -11,23 +11,25 @@ import ( appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" ) // InternalAppService is the data plane implementation of the AppService. -// It has direct K8s access via AppK8sClientInterface and no database dependency — -// all app state lives in KService CRDs. +// It has direct K8s access via AppK8sClientInterface. +// Condition history is persisted in the database via conditionRepo. type InternalAppService struct { appconnect.UnimplementedAppServiceHandler - k8s appk8s.AppK8sClientInterface - cfg *appconfig.InternalAppConfig + k8s appk8s.AppK8sClientInterface + conditionRepo interfaces.AppConditionsRepo + cfg *appconfig.InternalAppConfig } // NewInternalAppService creates a new InternalAppService. -func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.InternalAppConfig) *InternalAppService { - return &InternalAppService{k8s: k8s, cfg: cfg} +func NewInternalAppService(k8s appk8s.AppK8sClientInterface, conditionRepo interfaces.AppConditionsRepo, cfg *appconfig.InternalAppConfig) *InternalAppService { + return &InternalAppService{k8s: k8s, conditionRepo: conditionRepo, cfg: cfg} } // Ensure InternalAppService satisfies the generated handler interface. @@ -54,13 +56,17 @@ func (s *InternalAppService) Create( return nil, connect.NewError(connect.CodeInternal, err) } + cond := &flyteapp.Condition{ + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, + LastTransitionTime: timestamppb.Now(), + Message: "App is pending deployment", + } + if err := s.conditionRepo.AppendCondition(ctx, app.GetMetadata().GetId(), cond, s.cfg.MaxConditions); err != nil { + logger.Errorf(ctx, "Failed to persist condition for app %s: %v", app.GetMetadata().GetId().GetName(), err) + } + app.Status = &flyteapp.Status{ - Conditions: []*flyteapp.Condition{ - { - DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, - LastTransitionTime: timestamppb.Now(), - }, - }, + Conditions: []*flyteapp.Condition{cond}, Ingress: publicIngress(app.GetMetadata().GetId(), s.cfg), } @@ -106,6 +112,12 @@ func (s *InternalAppService) Get( return nil, connect.NewError(connect.CodeInternal, err) } + conditions, err := s.conditionRepo.GetConditions(ctx, appID.AppId) + if err != nil { + logger.Errorf(ctx, "Failed to get conditions for app %s: %v", appID.AppId.GetName(), err) + } + status.Conditions = conditions + return connect.NewResponse(&flyteapp.GetResponse{ App: &flyteapp.App{ Metadata: &flyteapp.Meta{Id: appID.AppId}, @@ -129,18 +141,32 @@ func (s *InternalAppService) Update( appID := app.GetMetadata().GetId() + var cond *flyteapp.Condition switch app.GetSpec().GetDesiredState() { case flyteapp.Spec_DESIRED_STATE_STOPPED: if err := s.k8s.Stop(ctx, appID); err != nil { logger.Errorf(ctx, "Failed to stop app %s: %v", appID.GetName(), err) return nil, connect.NewError(connect.CodeInternal, err) } + cond = &flyteapp.Condition{ + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, + LastTransitionTime: timestamppb.Now(), + Message: "App scaled to zero", + } default: // UNSPECIFIED, STARTED, ACTIVE — deploy/redeploy the spec. if err := s.k8s.Deploy(ctx, app); err != nil { logger.Errorf(ctx, "Failed to update app %s: %v", appID.GetName(), err) return nil, connect.NewError(connect.CodeInternal, err) } + cond = &flyteapp.Condition{ + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, + LastTransitionTime: timestamppb.Now(), + Message: "App is pending deployment", + } + } + if err := s.conditionRepo.AppendCondition(ctx, appID, cond, s.cfg.MaxConditions); err != nil { + logger.Errorf(ctx, "Failed to persist condition for app %s: %v", appID.GetName(), err) } status, err := s.k8s.GetStatus(ctx, appID) @@ -167,6 +193,10 @@ func (s *InternalAppService) Delete( return nil, connect.NewError(connect.CodeInternal, err) } + if err := s.conditionRepo.DeleteConditions(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to delete conditions for app %s: %v", appID.GetName(), err) + } + return connect.NewResponse(&flyteapp.DeleteResponse{}), nil } diff --git a/app/internal/setup.go b/app/internal/setup.go index 91023cb4d7e..847515cbacb 100644 --- a/app/internal/setup.go +++ b/app/internal/setup.go @@ -10,6 +10,8 @@ import ( appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/app/internal/migrations" + repoimpl "github.com/flyteorg/flyte/v2/app/internal/repository/impl" "github.com/flyteorg/flyte/v2/app/internal/service" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" knativeapp "github.com/flyteorg/flyte/v2/flytestdlib/app" @@ -28,8 +30,13 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter return fmt.Errorf("internalapp: failed to register Knative scheme: %w", err) } + if err := migrations.RunMigrations(ctx, sc.DB); err != nil { + return fmt.Errorf("internalapp: failed to run migrations: %w", err) + } + appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg) - internalAppSvc := service.NewInternalAppService(appK8sClient, cfg) + conditionRepo := repoimpl.NewAppConditionsRepo(sc.DB) + internalAppSvc := service.NewInternalAppService(appK8sClient, conditionRepo, cfg) path, handler := appconnect.NewAppServiceHandler(internalAppSvc) sc.Mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) From ea5d598d5c4efd47d01ea0a2fc92105fad69f4f4 Mon Sep 17 00:00:00 2001 From: machichima Date: Fri, 24 Apr 2026 09:54:20 +0800 Subject: [PATCH 4/6] test: fix test Signed-off-by: machichima --- .../service/internal_app_service_test.go | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go index 13ce54a1441..218ab0eb8fd 100644 --- a/app/internal/service/internal_app_service_test.go +++ b/app/internal/service/internal_app_service_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" appconfig "github.com/flyteorg/flyte/v2/app/internal/config" + "github.com/flyteorg/flyte/v2/app/internal/repository/mocks" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" @@ -74,6 +75,15 @@ func (m *mockAppK8sClient) Watch(ctx context.Context, project, domain, appName s // --- helpers --- +// newTestRepo returns a mock AppConditionsRepo that silently accepts any call. +func newTestRepo(t *testing.T) *mocks.AppConditionsRepo { + repo := mocks.NewAppConditionsRepo(t) + repo.On("AppendCondition", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + repo.On("GetConditions", mock.Anything, mock.Anything).Return(([]*flyteapp.Condition)(nil), nil).Maybe() + repo.On("DeleteConditions", mock.Anything, mock.Anything).Return(nil).Maybe() + return repo +} + func testCfg() *appconfig.InternalAppConfig { return &appconfig.InternalAppConfig{ Enabled: true, @@ -108,7 +118,7 @@ func testStatus(phase flyteapp.Status_DeploymentStatus) *flyteapp.Status { } func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceClient { - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) path, handler := appconnect.NewAppServiceHandler(svc) mux := http.NewServeMux() mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) @@ -121,7 +131,7 @@ func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceCli func TestCreate_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) app := testApp() k8s.On("Deploy", mock.Anything, app).Return(nil) @@ -134,7 +144,7 @@ func TestCreate_Success(t *testing.T) { } func TestCreate_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{Spec: testApp().Spec}, @@ -144,7 +154,7 @@ func TestCreate_MissingID(t *testing.T) { } func TestCreate_MissingSpec(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{Metadata: &flyteapp.Meta{Id: testAppID()}}, @@ -154,7 +164,7 @@ func TestCreate_MissingSpec(t *testing.T) { } func TestCreate_MissingPayload(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{ @@ -170,7 +180,7 @@ func TestCreate_IngressWithPort(t *testing.T) { k8s := &mockAppK8sClient{} cfg := testCfg() cfg.IngressAppsPort = 30081 - svc := NewInternalAppService(k8s, cfg) + svc := NewInternalAppService(k8s, newTestRepo(t), cfg) app := testApp() k8s.On("Deploy", mock.Anything, app).Return(nil) @@ -185,7 +195,7 @@ func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { k8s := &mockAppK8sClient{} cfg := testCfg() cfg.BaseDomain = "" - svc := NewInternalAppService(k8s, cfg) + svc := NewInternalAppService(k8s, newTestRepo(t), cfg) app := testApp() k8s.On("Deploy", mock.Anything, app).Return(nil) @@ -200,10 +210,13 @@ func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { func TestGet_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + repo := mocks.NewAppConditionsRepo(t) + svc := NewInternalAppService(k8s, repo, testCfg()) appID := testAppID() + dbConditions := []*flyteapp.Condition{{DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE}} k8s.On("GetStatus", mock.Anything, appID).Return(testStatus(flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE), nil) + repo.On("GetConditions", mock.Anything, appID).Return(dbConditions, nil) resp, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, @@ -214,7 +227,7 @@ func TestGet_Success(t *testing.T) { } func TestGet_MissingAppID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{})) require.Error(t, err) @@ -225,7 +238,7 @@ func TestGet_MissingAppID(t *testing.T) { func TestUpdate_Deploy(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) app := testApp() k8s.On("Deploy", mock.Anything, app).Return(nil) @@ -239,7 +252,7 @@ func TestUpdate_Deploy(t *testing.T) { func TestUpdate_Stop(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) app := testApp() app.Spec.DesiredState = flyteapp.Spec_DESIRED_STATE_STOPPED @@ -253,7 +266,7 @@ func TestUpdate_Stop(t *testing.T) { } func TestUpdate_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) _, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{ App: &flyteapp.App{}, @@ -266,7 +279,7 @@ func TestUpdate_MissingID(t *testing.T) { func TestDelete_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) appID := testAppID() k8s.On("Delete", mock.Anything, appID).Return(nil) @@ -277,7 +290,7 @@ func TestDelete_Success(t *testing.T) { } func TestDelete_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{})) require.Error(t, err) @@ -288,7 +301,7 @@ func TestDelete_MissingID(t *testing.T) { func TestList_ByProject(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) apps := []*flyteapp.App{testApp()} k8s.On("List", mock.Anything, "proj", "dev", uint32(10), "tok").Return(apps, "nexttok", nil) @@ -307,7 +320,7 @@ func TestList_ByProject(t *testing.T) { func TestList_NoFilter(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) k8s.On("List", mock.Anything, "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) From b8f8ba600edb121bc3e6895118bb98125d5f6d5f Mon Sep 17 00:00:00 2001 From: machichima Date: Tue, 28 Apr 2026 09:52:54 +0800 Subject: [PATCH 5/6] fix: ingress public url Signed-off-by: machichima --- app/internal/k8s/app_client.go | 17 +++++++++----- app/internal/service/internal_app_service.go | 23 +------------------ .../service/internal_app_service_test.go | 17 ++++++++++++-- 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index f60c80846f4..0c2574aaa5c 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -87,6 +87,11 @@ type AppK8sClientInterface interface { // Unsubscribe removes a subscription channel previously returned by Subscribe. Unsubscribe(appName string, ch chan *flyteapp.WatchResponse) + + // PublicIngress returns the deterministic public Ingress URL for the given app, + // matching Knative's domain-template so Kourier routes traffic correctly. + // Returns nil if BaseDomain is not configured. + PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress } // AppK8sClient implements AppK8sClientInterface using controller-runtime. @@ -432,9 +437,10 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string, limit u return apps, list.Continue, nil } -// publicIngress returns the deterministic public URL for an app using the same -// logic as the service layer so GetApp/List/Watch are consistent with Create. -func (c *AppK8sClient) publicIngress(id *flyteapp.Identifier) *flyteapp.Ingress { +// PublicIngress returns the deterministic public URL for an app. +// The host follows Knative's domain-template "{kservice-name}-{namespace}.{domain}" +// so Kourier routes traffic correctly without extra ingress rules. +func (c *AppK8sClient) PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress { if c.cfg.BaseDomain == "" { return nil } @@ -442,8 +448,7 @@ func (c *AppK8sClient) publicIngress(id *flyteapp.Identifier) *flyteapp.Ingress if scheme == "" { scheme = "https" } - host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", - id.GetName(), id.GetProject(), id.GetDomain(), c.cfg.BaseDomain)) + host := fmt.Sprintf("%s-%s.%s", kserviceName(id), appNamespace, c.cfg.BaseDomain) url := scheme + "://" + host if c.cfg.IngressAppsPort != 0 { url += fmt.Sprintf(":%d", c.cfg.IngressAppsPort) @@ -720,7 +725,7 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser parts := strings.SplitN(appIDStr, "/", 3) if len(parts) == 3 { appID := &flyteapp.Identifier{Project: parts[0], Domain: parts[1], Name: parts[2]} - status.Ingress = c.publicIngress(appID) + status.Ingress = c.PublicIngress(appID) } } diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go index 0cc467b9411..a8f4a98d072 100644 --- a/app/internal/service/internal_app_service.go +++ b/app/internal/service/internal_app_service.go @@ -3,7 +3,6 @@ package service import ( "context" "fmt" - "strings" "connectrpc.com/connect" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -67,32 +66,12 @@ func (s *InternalAppService) Create( app.Status = &flyteapp.Status{ Conditions: []*flyteapp.Condition{cond}, - Ingress: publicIngress(app.GetMetadata().GetId(), s.cfg), + Ingress: s.k8s.PublicIngress(app.GetMetadata().GetId()), } return connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil } -// publicIngress builds the deterministic public URL for an app using -// BaseDomain — which must match Knative's domain-template so Kourier -// serves the URL directly. Returns nil if BaseDomain is unset. -func publicIngress(id *flyteapp.Identifier, cfg *appconfig.InternalAppConfig) *flyteapp.Ingress { - if cfg.BaseDomain == "" { - return nil - } - scheme := cfg.Scheme - if scheme == "" { - scheme = "https" - } - host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s", - id.GetName(), id.GetProject(), id.GetDomain(), cfg.BaseDomain)) - url := scheme + "://" + host - if cfg.IngressAppsPort != 0 { - url += fmt.Sprintf(":%d", cfg.IngressAppsPort) - } - return &flyteapp.Ingress{PublicUrl: url} -} - // Get retrieves an app and its live status from the KService CRD. // Note: App.Spec is not populated — status and ingress URL are the authoritative fields. func (s *InternalAppService) Get( diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go index 8b84ec54a83..e5c96a699f4 100644 --- a/app/internal/service/internal_app_service_test.go +++ b/app/internal/service/internal_app_service_test.go @@ -82,6 +82,14 @@ func (m *mockAppK8sClient) Unsubscribe(appName string, ch chan *flyteapp.WatchRe m.Called(appName, ch) } +func (m *mockAppK8sClient) PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress { + args := m.Called(id) + if args.Get(0) == nil { + return nil + } + return args.Get(0).(*flyteapp.Ingress) +} + // --- helpers --- // newTestRepo returns a mock AppConditionsRepo that silently accepts any call. @@ -146,12 +154,14 @@ func TestCreate_Success(t *testing.T) { svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) app := testApp() + ingress := &flyteapp.Ingress{PublicUrl: "https://myapp-3dcbfc92-flyte.example.com"} k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("PublicIngress", app.Metadata.Id).Return(ingress) resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_PENDING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) - assert.Equal(t, "https://myapp-proj-dev.example.com", resp.Msg.App.Status.Ingress.PublicUrl) + assert.Equal(t, ingress.PublicUrl, resp.Msg.App.Status.Ingress.PublicUrl) k8s.AssertExpectations(t) } @@ -195,11 +205,13 @@ func TestCreate_IngressWithPort(t *testing.T) { svc := NewInternalAppService(k8s, newTestRepo(t), cfg) app := testApp() + ingress := &flyteapp.Ingress{PublicUrl: "https://myapp-3dcbfc92-flyte.example.com:30081"} k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("PublicIngress", app.Metadata.Id).Return(ingress) resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) - assert.Equal(t, "https://myapp-proj-dev.example.com:30081", resp.Msg.App.Status.Ingress.PublicUrl) + assert.Equal(t, ingress.PublicUrl, resp.Msg.App.Status.Ingress.PublicUrl) k8s.AssertExpectations(t) } @@ -211,6 +223,7 @@ func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { app := testApp() k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("PublicIngress", app.Metadata.Id).Return((*flyteapp.Ingress)(nil)) resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) From 1848163e377aff73ccf8e37eaf7581cfbe8d1d0d Mon Sep 17 00:00:00 2001 From: machichima Date: Tue, 28 Apr 2026 13:05:40 +0800 Subject: [PATCH 6/6] feat: persist app conditions to DB Signed-off-by: machichima --- app/internal/k8s/app_client.go | 89 ++++++++++--- app/internal/k8s/app_client_test.go | 123 +++++++++++++++++- app/internal/service/internal_app_service.go | 66 ++++------ .../service/internal_app_service_test.go | 57 ++++---- app/internal/setup.go | 4 +- 5 files changed, 248 insertions(+), 91 deletions(-) diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index 0c2574aaa5c..6e6a7aa3612 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/v2/app/internal/config" + "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" "github.com/flyteorg/flyte/v2/flytestdlib/k8s" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" @@ -96,9 +97,10 @@ type AppK8sClientInterface interface { // AppK8sClient implements AppK8sClientInterface using controller-runtime. type AppK8sClient struct { - k8sClient client.WithWatch - cache ctrlcache.Cache - cfg *config.InternalAppConfig + k8sClient client.WithWatch + cache ctrlcache.Cache + cfg *config.InternalAppConfig + conditionRepo interfaces.AppConditionsRepo // Watch management mu sync.RWMutex @@ -108,12 +110,15 @@ type AppK8sClient struct { } // NewAppK8sClient creates a new AppK8sClient. -func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig) *AppK8sClient { +// conditionRepo is used to persist condition history on every KService event; +// pass nil to disable condition persistence (e.g. in tests that do not need it). +func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig, conditionRepo interfaces.AppConditionsRepo) *AppK8sClient { return &AppK8sClient{ - k8sClient: k8sClient, - cache: cache, - cfg: cfg, - subscribers: make(map[string]map[chan *flyteapp.WatchResponse]struct{}), + k8sClient: k8sClient, + cache: cache, + cfg: cfg, + conditionRepo: conditionRepo, + subscribers: make(map[string]map[chan *flyteapp.WatchResponse]struct{}), } } @@ -277,14 +282,37 @@ func isManagedKService(ksvc *servingv1.Service) bool { return ksvc.Labels[labelAppManaged] == "true" } -// handleKServiceEvent converts a KService event into a WatchResponse and -// notifies all matching subscribers. +// handleKServiceEvent converts a KService event into a WatchResponse, +// persists the current deployment status as a condition, and notifies subscribers. func (c *AppK8sClient) handleKServiceEvent(ctx context.Context, ksvc *servingv1.Service, eventType k8swatch.EventType) { app, err := c.kserviceToApp(ctx, ksvc) if err != nil { return } + appID := app.GetMetadata().GetId() + + // Persist the current status as a condition on every K8s state change so that + // the full PENDING -> DEPLOYING -> ACTIVE/FAILED transition history is recorded, + // regardless of whether any Watch client is connected. + // Dedup: read the latest stored condition first; only append if status or message changed. + if c.conditionRepo != nil { + if eventType == k8swatch.Deleted { + // Ensure DB is clean even if a race between Delete() RPC and a late + // Modified event caused conditions to be re-written after the RPC cleared them. + if err := c.conditionRepo.DeleteConditions(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to delete conditions for app %s on watch deleted event: %v", appID.GetName(), err) + } + } else if status := app.GetStatus(); status != nil && len(status.GetConditions()) > 0 { + cond := status.GetConditions()[0] + if c.shouldAppendCondition(ctx, appID, cond) { + if err := c.conditionRepo.AppendCondition(ctx, appID, cond, c.cfg.MaxConditions); err != nil { + logger.Errorf(ctx, "Failed to persist condition for app %s on watch event: %v", appID.GetName(), err) + } + } + } + } + var resp *flyteapp.WatchResponse switch eventType { case k8swatch.Added: @@ -309,8 +337,29 @@ func (c *AppK8sClient) handleKServiceEvent(ctx context.Context, ksvc *servingv1. return } - appName := app.GetMetadata().GetId().GetName() - c.notifySubscribers(ctx, appName, resp) + c.notifySubscribers(ctx, appID.GetName(), resp) +} + +// shouldAppendCondition returns true if cond should be persisted. +// It skips two classes of noise: +// 1. PENDING with no message — the transient first Knative event before the +// Ready condition is populated; it carries no actionable information. +// 2. Exact duplicates (same DeploymentStatus + Message as the last stored entry). +func (c *AppK8sClient) shouldAppendCondition(ctx context.Context, appID *flyteapp.Identifier, cond *flyteapp.Condition) bool { + if cond.GetDeploymentStatus() == flyteapp.Status_DEPLOYMENT_STATUS_PENDING && cond.GetMessage() == "" { + return false + } + existing, err := c.conditionRepo.GetConditions(ctx, appID) + if err != nil { + // On read error, append to avoid missing transitions. + logger.Warnf(ctx, "Failed to read conditions for app %s, will append: %v", appID.GetName(), err) + return true + } + if len(existing) == 0 { + return true + } + last := existing[len(existing)-1] + return last.GetDeploymentStatus() != cond.GetDeploymentStatus() || last.GetMessage() != cond.GetMessage() } // notifySubscribers sends a WatchResponse to all subscribers for the given app name. @@ -702,18 +751,28 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser } if phase == flyteapp.Status_DEPLOYMENT_STATUS_UNSPECIFIED { + readyCond := ksvc.Status.GetCondition(servingv1.ServiceConditionReady) switch { case ksvc.IsReady(): phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE + message = "Service is ready" case ksvc.IsFailed(): phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED - if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { - message = c.Message + if readyCond != nil { + message = readyCond.Message } - case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: + case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName && + ksvc.Status.LatestCreatedRevisionName != "": phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING + message = fmt.Sprintf("Deploying revision: [%s]", ksvc.Status.LatestCreatedRevisionName) + if readyCond != nil && readyCond.Reason != "" { + message += fmt.Sprintf("\n%s: %s", readyCond.Reason, readyCond.Message) + } default: phase = flyteapp.Status_DEPLOYMENT_STATUS_PENDING + if readyCond != nil && readyCond.Reason != "" { + message = fmt.Sprintf("%s: %s", readyCond.Reason, readyCond.Message) + } } } diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 8df0b2b8cb3..1c364949cbe 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -19,6 +20,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/flyteorg/flyte/v2/app/internal/config" + "github.com/flyteorg/flyte/v2/app/internal/repository/mocks" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) @@ -58,7 +60,7 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { MaxRequestTimeout: time.Hour, WatchBufferSize: 100, } - return NewAppK8sClient(fc, nil, cfg) + return NewAppK8sClient(fc, nil, cfg, nil) } // testApp builds a minimal flyteapp.App for use in tests. @@ -584,6 +586,20 @@ func TestSubscribe_MultipleSubscribers(t *testing.T) { } } +// testClientWithRepo builds an AppK8sClient with a real conditionRepo mock. +func testClientWithRepo(t *testing.T, repo *mocks.AppConditionsRepo, objs ...client.Object) *AppK8sClient { + t.Helper() + s := testScheme(t) + fc := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build() + cfg := &config.InternalAppConfig{ + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + WatchBufferSize: 100, + MaxConditions: 40, + } + return NewAppK8sClient(fc, nil, cfg, repo) +} + // testKsvc builds a minimal KService that kserviceToApp can parse. func testKsvc(name, ns, rv string) *servingv1.Service { return &servingv1.Service{ @@ -596,3 +612,108 @@ func testKsvc(name, ns, rv string) *servingv1.Service { }, } } + +// --- Condition dedup and message format tests --- + +func TestHandleKServiceEvent_DeduplicatesConditions(t *testing.T) { + repo := mocks.NewAppConditionsRepo(t) + c := testClientWithRepo(t, repo) + + ksvc := testKsvc("myapp", appNamespace, "1") + appID := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + + existingCond := &flyteapp.Condition{ + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, + Message: "", + } + + // First call: no existing conditions → should append. + repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{}, nil).Once() + repo.On("AppendCondition", mock.Anything, appID, mock.Anything, mock.Anything).Return(nil).Once() + + c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified) + + // Second call: same status+message already stored → should NOT append. + repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{existingCond}, nil).Once() + + c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified) + + repo.AssertExpectations(t) +} + +func TestHandleKServiceEvent_AppendsOnStatusChange(t *testing.T) { + repo := mocks.NewAppConditionsRepo(t) + c := testClientWithRepo(t, repo) + + ksvc := testKsvc("myapp", appNamespace, "1") + appID := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + + // Existing condition is DEPLOYING, new event is PENDING → different → should append. + existingCond := &flyteapp.Condition{ + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING, + Message: "Deploying revision: [myapp-00001]", + } + repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{existingCond}, nil).Once() + repo.On("AppendCondition", mock.Anything, appID, mock.Anything, mock.Anything).Return(nil).Once() + + c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified) + + repo.AssertExpectations(t) +} + +func TestKserviceToStatus_Messages(t *testing.T) { + tests := []struct { + name string + ksvc func() *servingv1.Service + wantPhase flyteapp.Status_DeploymentStatus + wantMessage string + }{ + { + name: "active", + ksvc: func() *servingv1.Service { + ksvc := testKsvc("myapp", appNamespace, "1") + ksvc.Status.MarkRouteReady() + ksvc.Status.MarkConfigurationReady() + return ksvc + }, + wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE, + wantMessage: "Service is ready", + }, + { + name: "deploying with knative reason", + ksvc: func() *servingv1.Service { + ksvc := testKsvc("myapp", appNamespace, "1") + ksvc.Status.LatestCreatedRevisionName = "myapp-00002" + ksvc.Status.LatestReadyRevisionName = "myapp-00001" + ksvc.Status.MarkRouteNotYetReady() + return ksvc + }, + wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING, + wantMessage: "Deploying revision: [myapp-00002]", + }, + { + name: "stopped", + ksvc: func() *servingv1.Service { + ksvc := testKsvc("myapp", appNamespace, "1") + if ksvc.Spec.Template.Annotations == nil { + ksvc.Spec.Template.Annotations = map[string]string{} + } + ksvc.Spec.Template.Annotations["autoscaling.knative.dev/max-scale"] = "0" + return ksvc + }, + wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, + wantMessage: "App scaled to zero", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := testClient(t) + status := c.kserviceToStatus(context.Background(), tt.ksvc()) + require.NotNil(t, status) + require.NotEmpty(t, status.Conditions) + assert.Equal(t, tt.wantPhase, status.Conditions[0].DeploymentStatus) + assert.Equal(t, tt.wantMessage, status.Conditions[0].Message) + }) + } +} diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go index a8f4a98d072..85d9b6aa8e2 100644 --- a/app/internal/service/internal_app_service.go +++ b/app/internal/service/internal_app_service.go @@ -5,10 +5,8 @@ import ( "fmt" "connectrpc.com/connect" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" k8serrors "k8s.io/apimachinery/pkg/api/errors" - appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" "github.com/flyteorg/flyte/v2/flytestdlib/logger" @@ -18,17 +16,17 @@ import ( // InternalAppService is the data plane implementation of the AppService. // It has direct K8s access via AppK8sClientInterface. -// Condition history is persisted in the database via conditionRepo. +// conditionRepo is used to read condition history; conditions are written by +// AppK8sClient.handleKServiceEvent on every KService informer event. type InternalAppService struct { appconnect.UnimplementedAppServiceHandler k8s appk8s.AppK8sClientInterface conditionRepo interfaces.AppConditionsRepo - cfg *appconfig.InternalAppConfig } // NewInternalAppService creates a new InternalAppService. -func NewInternalAppService(k8s appk8s.AppK8sClientInterface, conditionRepo interfaces.AppConditionsRepo, cfg *appconfig.InternalAppConfig) *InternalAppService { - return &InternalAppService{k8s: k8s, conditionRepo: conditionRepo, cfg: cfg} +func NewInternalAppService(k8s appk8s.AppK8sClientInterface, conditionRepo interfaces.AppConditionsRepo) *InternalAppService { + return &InternalAppService{k8s: k8s, conditionRepo: conditionRepo} } // Ensure InternalAppService satisfies the generated handler interface. @@ -55,18 +53,8 @@ func (s *InternalAppService) Create( return nil, connect.NewError(connect.CodeInternal, err) } - cond := &flyteapp.Condition{ - DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, - LastTransitionTime: timestamppb.Now(), - Message: "App is pending deployment", - } - if err := s.conditionRepo.AppendCondition(ctx, app.GetMetadata().GetId(), cond, s.cfg.MaxConditions); err != nil { - logger.Errorf(ctx, "Failed to persist condition for app %s: %v", app.GetMetadata().GetId().GetName(), err) - } - app.Status = &flyteapp.Status{ - Conditions: []*flyteapp.Condition{cond}, - Ingress: s.k8s.PublicIngress(app.GetMetadata().GetId()), + Ingress: s.k8s.PublicIngress(app.GetMetadata().GetId()), } return connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil @@ -83,7 +71,18 @@ func (s *InternalAppService) Get( return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app_id is required")) } - app, err := s.k8s.GetApp(ctx, appID.AppId) + app, err := s.getWithConditions(ctx, appID.AppId) + if err != nil { + return nil, err + } + + return connect.NewResponse(&flyteapp.GetResponse{App: app}), nil +} + +// getWithConditions fetches an app from K8s and merges its stored DB condition +// history into the status. Both Get and Update use this so their responses are consistent. +func (s *InternalAppService) getWithConditions(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.App, error) { + app, err := s.k8s.GetApp(ctx, appID) if err != nil { if k8serrors.IsNotFound(err) { return nil, connect.NewError(connect.CodeNotFound, err) @@ -91,15 +90,14 @@ func (s *InternalAppService) Get( return nil, connect.NewError(connect.CodeInternal, err) } - conditions, err := s.conditionRepo.GetConditions(ctx, appID.AppId) + conditions, err := s.conditionRepo.GetConditions(ctx, appID) if err != nil { - logger.Errorf(ctx, "Failed to get conditions for app %s: %v", appID.AppId.GetName(), err) + logger.Errorf(ctx, "Failed to get conditions for app %s: %v", appID.GetName(), err) + } else { + app.Status.Conditions = conditions } - app.Status.Conditions = conditions - return connect.NewResponse(&flyteapp.GetResponse{ - App: app, - }), nil + return app, nil } // Update modifies an app's spec or desired state. @@ -117,37 +115,25 @@ func (s *InternalAppService) Update( appID := app.GetMetadata().GetId() - var cond *flyteapp.Condition switch app.GetSpec().GetDesiredState() { case flyteapp.Spec_DESIRED_STATE_STOPPED: if err := s.k8s.Stop(ctx, appID); err != nil { logger.Errorf(ctx, "Failed to stop app %s: %v", appID.GetName(), err) return nil, connect.NewError(connect.CodeInternal, err) } - cond = &flyteapp.Condition{ - DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, - LastTransitionTime: timestamppb.Now(), - Message: "App scaled to zero", - } default: // UNSPECIFIED, STARTED, ACTIVE — deploy/redeploy the spec. if err := s.k8s.Deploy(ctx, app); err != nil { logger.Errorf(ctx, "Failed to update app %s: %v", appID.GetName(), err) return nil, connect.NewError(connect.CodeInternal, err) } - cond = &flyteapp.Condition{ - DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, - LastTransitionTime: timestamppb.Now(), - Message: "App is pending deployment", - } - } - if err := s.conditionRepo.AppendCondition(ctx, appID, cond, s.cfg.MaxConditions); err != nil { - logger.Errorf(ctx, "Failed to persist condition for app %s: %v", appID.GetName(), err) } - freshApp, err := s.k8s.GetApp(ctx, appID) + // Return K8s live status merged with the full DB condition history, + // consistent with Get(). Condition recording is driven by handleKServiceEvent. + freshApp, err := s.getWithConditions(ctx, appID) if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + return nil, err } app.Status = freshApp.Status diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go index e5c96a699f4..d068a87d272 100644 --- a/app/internal/service/internal_app_service_test.go +++ b/app/internal/service/internal_app_service_test.go @@ -5,14 +5,12 @@ import ( "net/http" "net/http/httptest" "testing" - "time" "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - appconfig "github.com/flyteorg/flyte/v2/app/internal/config" "github.com/flyteorg/flyte/v2/app/internal/repository/mocks" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" @@ -101,16 +99,6 @@ func newTestRepo(t *testing.T) *mocks.AppConditionsRepo { return repo } -func testCfg() *appconfig.InternalAppConfig { - return &appconfig.InternalAppConfig{ - Enabled: true, - BaseDomain: "example.com", - Scheme: "https", - DefaultRequestTimeout: 5 * time.Minute, - MaxRequestTimeout: time.Hour, - } -} - func testAppID() *flyteapp.Identifier { return &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} } @@ -138,7 +126,7 @@ func testAppWithStatus(phase flyteapp.Status_DeploymentStatus) *flyteapp.App { } func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceClient { - svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) path, handler := appconnect.NewAppServiceHandler(svc) mux := http.NewServeMux() mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) @@ -151,7 +139,7 @@ func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceCli func TestCreate_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) app := testApp() ingress := &flyteapp.Ingress{PublicUrl: "https://myapp-3dcbfc92-flyte.example.com"} @@ -160,13 +148,14 @@ func TestCreate_Success(t *testing.T) { resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) - assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_PENDING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) + // Conditions are written by handleKServiceEvent, not by Create directly. + assert.Empty(t, resp.Msg.App.Status.Conditions) assert.Equal(t, ingress.PublicUrl, resp.Msg.App.Status.Ingress.PublicUrl) k8s.AssertExpectations(t) } func TestCreate_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{Spec: testApp().Spec}, @@ -176,7 +165,7 @@ func TestCreate_MissingID(t *testing.T) { } func TestCreate_MissingSpec(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{Metadata: &flyteapp.Meta{Id: testAppID()}}, @@ -186,7 +175,7 @@ func TestCreate_MissingSpec(t *testing.T) { } func TestCreate_MissingPayload(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{ @@ -200,9 +189,7 @@ func TestCreate_MissingPayload(t *testing.T) { func TestCreate_IngressWithPort(t *testing.T) { k8s := &mockAppK8sClient{} - cfg := testCfg() - cfg.IngressAppsPort = 30081 - svc := NewInternalAppService(k8s, newTestRepo(t), cfg) + svc := NewInternalAppService(k8s, newTestRepo(t)) app := testApp() ingress := &flyteapp.Ingress{PublicUrl: "https://myapp-3dcbfc92-flyte.example.com:30081"} @@ -217,9 +204,7 @@ func TestCreate_IngressWithPort(t *testing.T) { func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { k8s := &mockAppK8sClient{} - cfg := testCfg() - cfg.BaseDomain = "" - svc := NewInternalAppService(k8s, newTestRepo(t), cfg) + svc := NewInternalAppService(k8s, newTestRepo(t)) app := testApp() k8s.On("Deploy", mock.Anything, app).Return(nil) @@ -236,7 +221,7 @@ func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { func TestGet_Success(t *testing.T) { k8s := &mockAppK8sClient{} repo := mocks.NewAppConditionsRepo(t) - svc := NewInternalAppService(k8s, repo, testCfg()) + svc := NewInternalAppService(k8s, repo) appID := testAppID() k8s.On("GetApp", mock.Anything, appID).Return(testAppWithStatus(flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE), nil) @@ -252,7 +237,7 @@ func TestGet_Success(t *testing.T) { } func TestGet_MissingAppID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{})) require.Error(t, err) @@ -263,11 +248,14 @@ func TestGet_MissingAppID(t *testing.T) { func TestUpdate_Deploy(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) + repo := mocks.NewAppConditionsRepo(t) + svc := NewInternalAppService(k8s, repo) app := testApp() + dbConditions := []*flyteapp.Condition{{DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING}} k8s.On("Deploy", mock.Anything, app).Return(nil) k8s.On("GetApp", mock.Anything, app.Metadata.Id).Return(testAppWithStatus(flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING), nil) + repo.On("GetConditions", mock.Anything, app.Metadata.Id).Return(dbConditions, nil) resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) require.NoError(t, err) @@ -277,12 +265,15 @@ func TestUpdate_Deploy(t *testing.T) { func TestUpdate_Stop(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) + repo := mocks.NewAppConditionsRepo(t) + svc := NewInternalAppService(k8s, repo) app := testApp() app.Spec.DesiredState = flyteapp.Spec_DESIRED_STATE_STOPPED + dbConditions := []*flyteapp.Condition{{DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_STOPPED}} k8s.On("Stop", mock.Anything, app.Metadata.Id).Return(nil) k8s.On("GetApp", mock.Anything, app.Metadata.Id).Return(testAppWithStatus(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED), nil) + repo.On("GetConditions", mock.Anything, app.Metadata.Id).Return(dbConditions, nil) resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) require.NoError(t, err) @@ -291,7 +282,7 @@ func TestUpdate_Stop(t *testing.T) { } func TestUpdate_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{ App: &flyteapp.App{}, @@ -304,7 +295,7 @@ func TestUpdate_MissingID(t *testing.T) { func TestDelete_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) appID := testAppID() k8s.On("Delete", mock.Anything, appID).Return(nil) @@ -315,7 +306,7 @@ func TestDelete_Success(t *testing.T) { } func TestDelete_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t), testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{})) require.Error(t, err) @@ -326,7 +317,7 @@ func TestDelete_MissingID(t *testing.T) { func TestList_ByProject(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) apps := []*flyteapp.App{testApp()} k8s.On("List", mock.Anything, "proj", "dev", uint32(10), "tok").Return(apps, "nexttok", nil) @@ -345,7 +336,7 @@ func TestList_ByProject(t *testing.T) { func TestList_NoFilter(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, newTestRepo(t), testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) k8s.On("List", mock.Anything, "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) diff --git a/app/internal/setup.go b/app/internal/setup.go index 924768dd6f7..e7cb06d1b5b 100644 --- a/app/internal/setup.go +++ b/app/internal/setup.go @@ -33,9 +33,9 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter return fmt.Errorf("internalapp: failed to run migrations: %w", err) } - appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg) conditionRepo := repoimpl.NewAppConditionsRepo(sc.DB) - internalAppSvc := service.NewInternalAppService(appK8sClient, conditionRepo, cfg) + appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg, conditionRepo) + internalAppSvc := service.NewInternalAppService(appK8sClient, conditionRepo) if err := appK8sClient.StartWatching(ctx); err != nil { return fmt.Errorf("internalapp: failed to start KService watcher: %w", err)