diff --git a/api/orchestration/v1alpha1/roleset_types.go b/api/orchestration/v1alpha1/roleset_types.go index e2070f8f9..34b558543 100644 --- a/api/orchestration/v1alpha1/roleset_types.go +++ b/api/orchestration/v1alpha1/roleset_types.go @@ -150,6 +150,30 @@ type RoleSpec struct { // +optional Replicas *int32 `json:"replicas,omitempty"` + // Dependencies specifies the list of role names that must be ready before this role starts or is updated. + // This is used to define a dependency graph between different roles, ensuring correct ordering during + // both initial scale-up and rolling updates. Each element in the slice should correspond to the name + // of another role defined within the same StormService specification. + // + // For example, if RoleA depends on RoleB and RoleC, then RoleA's Dependencies would look like: + // Dependencies: []string{"RoleB", "RoleC"} + // + // - **During scale-up (initial startup or scaling from zero):** + // RoleA will not begin creating Pods until all its dependencies (e.g., RoleB and RoleC) have reached + // their desired number of ready replicas. + // + // - **During rolling updates:** + // RoleA will not proceed to update its existing Pods if any of its dependencies are not currently ready. + // This includes cases where a dependency is failing, has zero ready replicas, or is itself undergoing an update. + // Note: Controller **does not automatically trigger updates** on dependent roles (e.g., RoleB or RoleC). + // It only checks whether they are currently ready. You must explicitly update dependencies if needed. + // + // In both cases, if a dependency becomes unready at any time, RoleA’s scale-up or rollout will be paused + // until all dependencies are satisfied again. + // + // +optional + Dependencies []string `json:"dependencies,omitempty"` + // UpgradeOrder specifies the order in which this role should be upgraded. // Lower values are upgraded first. If not specified, roles upgrade after all explicitly ordered roles. // +optional diff --git a/api/orchestration/v1alpha1/zz_generated.deepcopy.go b/api/orchestration/v1alpha1/zz_generated.deepcopy.go index ba0d64064..c06b81cee 100644 --- a/api/orchestration/v1alpha1/zz_generated.deepcopy.go +++ b/api/orchestration/v1alpha1/zz_generated.deepcopy.go @@ -829,6 +829,11 @@ func (in *RoleSpec) DeepCopyInto(out *RoleSpec) { *out = new(int32) **out = **in } + if in.Dependencies != nil { + in, out := &in.Dependencies, &out.Dependencies + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.UpgradeOrder != nil { in, out := &in.UpgradeOrder, &out.UpgradeOrder *out = new(int32) diff --git a/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml b/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml index a13cc7334..c1a6c7c3f 100644 --- a/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml +++ b/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml @@ -38,6 +38,10 @@ spec: roles: items: properties: + dependencies: + items: + type: string + type: array disruptionTolerance: properties: maxUnavailable: diff --git a/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml b/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml index 99d59ef8b..d0ce7fb0a 100644 --- a/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml +++ b/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml @@ -113,6 +113,10 @@ spec: roles: items: properties: + dependencies: + items: + type: string + type: array disruptionTolerance: properties: maxUnavailable: diff --git a/dist/chart/crds/orchestration.aibrix.ai_rolesets.yaml b/dist/chart/crds/orchestration.aibrix.ai_rolesets.yaml index a13cc7334..c1a6c7c3f 100644 --- a/dist/chart/crds/orchestration.aibrix.ai_rolesets.yaml +++ b/dist/chart/crds/orchestration.aibrix.ai_rolesets.yaml @@ -38,6 +38,10 @@ spec: roles: items: properties: + dependencies: + items: + type: string + type: array disruptionTolerance: properties: maxUnavailable: diff --git a/dist/chart/crds/orchestration.aibrix.ai_stormservices.yaml b/dist/chart/crds/orchestration.aibrix.ai_stormservices.yaml index 99d59ef8b..d0ce7fb0a 100644 --- a/dist/chart/crds/orchestration.aibrix.ai_stormservices.yaml +++ b/dist/chart/crds/orchestration.aibrix.ai_stormservices.yaml @@ -113,6 +113,10 @@ spec: roles: items: properties: + dependencies: + items: + type: string + type: array disruptionTolerance: properties: maxUnavailable: diff --git a/pkg/client/applyconfiguration/orchestration/v1alpha1/rolespec.go b/pkg/client/applyconfiguration/orchestration/v1alpha1/rolespec.go index 6dfc6eae9..d6be97b73 100644 --- a/pkg/client/applyconfiguration/orchestration/v1alpha1/rolespec.go +++ b/pkg/client/applyconfiguration/orchestration/v1alpha1/rolespec.go @@ -26,6 +26,7 @@ import ( type RoleSpecApplyConfiguration struct { Name *string `json:"name,omitempty"` Replicas *int32 `json:"replicas,omitempty"` + Dependencies []string `json:"dependencies,omitempty"` UpgradeOrder *int32 `json:"upgradeOrder,omitempty"` PodGroupSize *int32 `json:"podGroupSize,omitempty"` UpdateStrategy *RoleUpdateStrategyApplyConfiguration `json:"updateStrategy,omitempty"` @@ -57,6 +58,16 @@ func (b *RoleSpecApplyConfiguration) WithReplicas(value int32) *RoleSpecApplyCon return b } +// WithDependencies adds the given value to the Dependencies field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Dependencies field. +func (b *RoleSpecApplyConfiguration) WithDependencies(values ...string) *RoleSpecApplyConfiguration { + for i := range values { + b.Dependencies = append(b.Dependencies, values[i]) + } + return b +} + // WithUpgradeOrder sets the UpgradeOrder field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the UpgradeOrder field is set to the value of the last call. diff --git a/pkg/controller/roleset/rolling.go b/pkg/controller/roleset/rolling.go index fbd8cee2a..5a5625d9f 100644 --- a/pkg/controller/roleset/rolling.go +++ b/pkg/controller/roleset/rolling.go @@ -22,11 +22,10 @@ import ( "math" "strings" - orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" + + orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1" ) type RollingManager interface { @@ -38,22 +37,35 @@ type RollingManagerSequential struct { } func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) { - // 1. ensure pod replica meet expectations + // 1. Find the furthest role whose dependencies are ready + progressIndex := -1 + for i, role := range roleSet.Spec.Roles { + if !isRoleDependenciesReady(roleSet, &role) { + break + } + progressIndex = i + } + + if progressIndex == -1 { + return nil // even A is blocked (shouldn't happen) + } + + // 2. Scale all roles from 0 to progressIndex var scaling bool - for _, role := range roleSet.Spec.Roles { - klog.Infof("[RollingManagerSequential.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name) - s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role) + for i := 0; i <= progressIndex; i++ { + role := &roleSet.Spec.Roles[i] + s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role) if err != nil { return err } scaling = scaling || s } if scaling { - klog.Infof("[RollingManagerSequential.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name) + // Wait for Pods to be created and status updated return nil } - // 2. Sort roles by upgrade order + // 3. Sort roles by upgrade order klog.Infof("[RollingManagerSequential.Next] sorting roleset roles by UpgradeOrder") sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles) var sequenceLines []string @@ -72,8 +84,12 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat roleSet.Name, strings.Join(sequenceLines, "\n")) - // 3. do the rollout process for each role by order + // 4. do the rollout process for each role by order for _, role := range sortedRoles { + if !isRoleDependenciesReady(roleSet, &role) { + klog.Infof("Rollout of role %s blocked: dependencies not ready. Stopping sequential update.", role.Name) + break // enforce strict sequential order + } klog.Infof("[RollingManagerSequential.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name) err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role) if err != nil { @@ -95,29 +111,48 @@ type RollingManagerParallel struct { } func (m *RollingManagerParallel) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) { - // 1. ensure pod replica meet expectations + // 1. Find the furthest role whose dependencies are ready + progressIndex := -1 + for i, role := range roleSet.Spec.Roles { + if !isRoleDependenciesReady(roleSet, &role) { + break + } + progressIndex = i + } + + if progressIndex == -1 { + return nil // even A is blocked (shouldn't happen) + } + + // 2: Scale all roles from 0 to progressIndex var scaling bool - for _, role := range roleSet.Spec.Roles { - klog.Infof("[RollingManagerParallel.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name) - s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role) + for i := 0; i <= progressIndex; i++ { + role := &roleSet.Spec.Roles[i] + s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role) if err != nil { return err } scaling = scaling || s } if scaling { - klog.Infof("[RollingManagerParallel.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name) - return + // Wait for Pods to be created and status updated + return nil } - // 2. do the rollout process for each role + + // 3. do the rollout process for each role for _, role := range roleSet.Spec.Roles { + if !isRoleDependenciesReady(roleSet, &role) { + klog.Infof("Skipping rollout for role %s: "+ + "dependencies not ready (parallel update continues for other roles)", role.Name) + continue // parallel mode, skip only this role + } klog.Infof("[RollingManagerParallel.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name) err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role) if err != nil { return err } } - return + return nil } type RollingManagerInterleave struct { @@ -127,21 +162,34 @@ type RollingManagerInterleave struct { // Interleaved rollout: update roles in alternating steps, // using (maxSurge + maxUnavailable) as the step size for all roles func (m *RollingManagerInterleave) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) { - // 1. ensure pod replica meet expectations + // 1. Find the furthest role whose dependencies are ready + progressIndex := -1 + for i, role := range roleSet.Spec.Roles { + if !isRoleDependenciesReady(roleSet, &role) { + break + } + progressIndex = i + } + + if progressIndex == -1 { + return nil // even A is blocked (shouldn't happen) + } + + // 2. Scale all roles from 0 to progressIndex var scaling bool - for _, role := range roleSet.Spec.Roles { - klog.Infof("[RollingManagerInterleave.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name) - s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role) + for i := 0; i <= progressIndex; i++ { + role := &roleSet.Spec.Roles[i] + s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role) if err != nil { return err } scaling = scaling || s } if scaling { - klog.Infof("[RollingManagerInterleave.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name) + // Wait for Pods to be created and status updated return nil } - // 2. do the rollout process for each role + // 3. do the rollout process for each role roleSteps := make(map[string]int32) roleReadyStatus := make(map[string]bool) currentStep := int32(math.MaxInt32) @@ -149,6 +197,11 @@ func (m *RollingManagerInterleave) Next(ctx context.Context, roleSet *orchestrat // Check the current step for each role and determine the minimum step across all roles as the global current step for _, role := range roleSet.Spec.Roles { + if !isRoleDependenciesReady(roleSet, &role) { + klog.Infof("Skipping role %s/%s: dependencies not ready (will retry in next reconcile)", + roleSet.Namespace, role.Name) + continue // interleaved mode: skip only this role, others may proceed + } allReady, currentRoleStep, err := GetRoleSyncer(m.cli, &role).CheckCurrentStep(ctx, roleSet, &role) if err != nil { klog.Errorf("[RollingManagerInterleave.Next] Failed to get current step for role %s in roleset %s/%s: %v", role.Name, roleSet.Namespace, roleSet.Name, err) diff --git a/pkg/controller/roleset/utils.go b/pkg/controller/roleset/utils.go index 79c920f32..dac3fd4e9 100644 --- a/pkg/controller/roleset/utils.go +++ b/pkg/controller/roleset/utils.go @@ -436,3 +436,35 @@ func sortRolesByUpgradeOrder(roles []orchestrationv1alpha1.RoleSpec) []orchestra }) return sortedRoles } + +func isRoleDependenciesReady(roleSet *orchestrationv1alpha1.RoleSet, role *orchestrationv1alpha1.RoleSpec) bool { + if len(role.Dependencies) == 0 { + return true + } + + // Initialize all roles to 0 + roleStatusMap := make(map[string]int32) + for _, r := range roleSet.Spec.Roles { + roleStatusMap[r.Name] = 0 + } + for _, rs := range roleSet.Status.Roles { + roleStatusMap[rs.Name] = rs.ReadyReplicas + } + + for _, depName := range role.Dependencies { + depReady := roleStatusMap[depName] + expected := int32(1) + for _, r := range roleSet.Spec.Roles { + if r.Name == depName && r.Replicas != nil { + expected = *r.Replicas + break + } + } + if depReady < expected { + klog.V(4).Infof("Role %s depends on %s, "+ + "but only %d/%d ready", role.Name, depName, depReady, expected) + return false + } + } + return true +} diff --git a/pkg/controller/roleset/utils_test.go b/pkg/controller/roleset/utils_test.go index ec72ec2db..71212219b 100644 --- a/pkg/controller/roleset/utils_test.go +++ b/pkg/controller/roleset/utils_test.go @@ -490,3 +490,179 @@ func TestSortRolesByUpgradeOrder(t *testing.T) { }) } } + +func TestIsRoleDependenciesReady(t *testing.T) { + tests := []struct { + name string + roleSet *orchestrationv1alpha1.RoleSet + role *orchestrationv1alpha1.RoleSpec + expected bool + }{ + { + name: "no dependencies", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "A", + }, + roleSet: &orchestrationv1alpha1.RoleSet{}, + expected: true, + }, + { + name: "dependency ready (explicit replicas=2, ready=2)", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "B", + Dependencies: []string{"A"}, + }, + roleSet: &orchestrationv1alpha1.RoleSet{ + Spec: orchestrationv1alpha1.RoleSetSpec{ + Roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Replicas: int32Ptr(2)}, + }, + }, + Status: orchestrationv1alpha1.RoleSetStatus{ + Roles: []orchestrationv1alpha1.RoleStatus{ + {Name: "A", ReadyReplicas: 2}, + }, + }, + }, + expected: true, + }, + { + name: "dependency not ready (replicas=2, ready=1)", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "B", + Dependencies: []string{"A"}, + }, + roleSet: &orchestrationv1alpha1.RoleSet{ + Spec: orchestrationv1alpha1.RoleSetSpec{ + Roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Replicas: int32Ptr(2)}, + }, + }, + Status: orchestrationv1alpha1.RoleSetStatus{ + Roles: []orchestrationv1alpha1.RoleStatus{ + {Name: "A", ReadyReplicas: 1}, + }, + }, + }, + expected: false, + }, + { + name: "dependency with nil Replicas (defaults to 1), ready=1", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "B", + Dependencies: []string{"A"}, + }, + roleSet: &orchestrationv1alpha1.RoleSet{ + Spec: orchestrationv1alpha1.RoleSetSpec{ + Roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A"}, // Replicas is nil + }, + }, + Status: orchestrationv1alpha1.RoleSetStatus{ + Roles: []orchestrationv1alpha1.RoleStatus{ + {Name: "A", ReadyReplicas: 1}, + }, + }, + }, + expected: true, + }, + { + name: "dependency with nil Replicas, ready=0", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "B", + Dependencies: []string{"A"}, + }, + roleSet: &orchestrationv1alpha1.RoleSet{ + Spec: orchestrationv1alpha1.RoleSetSpec{ + Roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A"}, + }, + }, + Status: orchestrationv1alpha1.RoleSetStatus{ + Roles: []orchestrationv1alpha1.RoleStatus{ + {Name: "A", ReadyReplicas: 0}, + }, + }, + }, + expected: false, + }, + { + name: "dependency missing in status (treated as 0 ready)", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "B", + Dependencies: []string{"A"}, + }, + roleSet: &orchestrationv1alpha1.RoleSet{ + Spec: orchestrationv1alpha1.RoleSetSpec{ + Roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Replicas: int32Ptr(1)}, + {Name: "B"}, + }, + }, + Status: orchestrationv1alpha1.RoleSetStatus{ + Roles: []orchestrationv1alpha1.RoleStatus{ + // A is missing in status! + {Name: "B", ReadyReplicas: 1}, + }, + }, + }, + expected: false, // because A's ready = 0 (not in status) + }, + { + name: "multiple dependencies, all ready", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "C", + Dependencies: []string{"A", "B"}, + }, + roleSet: &orchestrationv1alpha1.RoleSet{ + Spec: orchestrationv1alpha1.RoleSetSpec{ + Roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Replicas: int32Ptr(1)}, + {Name: "B", Replicas: int32Ptr(3)}, + {Name: "C"}, + }, + }, + Status: orchestrationv1alpha1.RoleSetStatus{ + Roles: []orchestrationv1alpha1.RoleStatus{ + {Name: "A", ReadyReplicas: 1}, + {Name: "B", ReadyReplicas: 3}, + }, + }, + }, + expected: true, + }, + { + name: "multiple dependencies, one not ready", + role: &orchestrationv1alpha1.RoleSpec{ + Name: "C", + Dependencies: []string{"A", "B"}, + }, + roleSet: &orchestrationv1alpha1.RoleSet{ + Spec: orchestrationv1alpha1.RoleSetSpec{ + Roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Replicas: int32Ptr(1)}, + {Name: "B", Replicas: int32Ptr(3)}, + {Name: "C"}, + }, + }, + Status: orchestrationv1alpha1.RoleSetStatus{ + Roles: []orchestrationv1alpha1.RoleStatus{ + {Name: "A", ReadyReplicas: 1}, + {Name: "B", ReadyReplicas: 2}, // only 2/3 ready + }, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isRoleDependenciesReady(tt.roleSet, tt.role) + assert.Equal(t, tt.expected, result) + }) + } +} + +// Helper +func int32Ptr(i int32) *int32 { return &i } diff --git a/pkg/controller/stormservice/rolesetoperations.go b/pkg/controller/stormservice/rolesetoperations.go index 4a56c9194..1a3e5fe81 100644 --- a/pkg/controller/stormservice/rolesetoperations.go +++ b/pkg/controller/stormservice/rolesetoperations.go @@ -57,6 +57,20 @@ func (r *StormServiceReconciler) getRoleSetList(ctx context.Context, selector *m // renderRoleSet creates a RoleSet with per-role revision annotations func (r *StormServiceReconciler) renderRoleSet(stormService *orchestrationv1alpha1.StormService, index *int, revisionName string, roleRevisions map[string]*apps.ControllerRevision) (*orchestrationv1alpha1.RoleSet, error) { + // deep copy the template spec first + spec := stormService.Spec.Template.Spec.DeepCopy() + if spec == nil { + return nil, fmt.Errorf("stormService template spec is nil") + } + + // Sort roles by dependency topological order + sortedRoles, err := r.topologicalSortRolesFromSpec(spec.Roles) + if err != nil { + klog.Warningf("Failed to topologically sort roles in StormService %s/%s: %v. Using original order.", stormService.Namespace, stormService.Name, err) + // Optionally, you can return error to block creation, but for safety we fall back to original order + } else { + spec.Roles = sortedRoles + } roleSet := &orchestrationv1alpha1.RoleSet{ ObjectMeta: metav1.ObjectMeta{ GenerateName: utils.Shorten(fmt.Sprintf("%s-roleset-", stormService.Name), true, true), @@ -67,7 +81,7 @@ func (r *StormServiceReconciler) renderRoleSet(stormService *orchestrationv1alph Labels: utils.DeepCopyMap(stormService.Spec.Template.Labels), Annotations: utils.DeepCopyMap(stormService.Spec.Template.Annotations), }, - Spec: *stormService.Spec.Template.Spec.DeepCopy(), + Spec: *spec, } if roleSet.Labels == nil { roleSet.Labels = make(map[string]string) diff --git a/pkg/controller/stormservice/sync.go b/pkg/controller/stormservice/sync.go index 731ed3daa..fe7682bd7 100644 --- a/pkg/controller/stormservice/sync.go +++ b/pkg/controller/stormservice/sync.go @@ -70,6 +70,88 @@ func (r *StormServiceReconciler) sync(ctx context.Context, stormService *orchest return 0, nil } +// topologicalSortRolesFromSpec sorts roles by their dependencies using Kahn's algorithm. +// Returns sorted []RoleSpec, or error if circular dependency exists. +// TODO(CYJiang): validate role dependencies during StormService creation in webhook to prevent circular dependencies. +func (r *StormServiceReconciler) topologicalSortRolesFromSpec(roles []orchestrationv1alpha1.RoleSpec) ([]orchestrationv1alpha1.RoleSpec, error) { + if len(roles) == 0 { + return roles, nil + } + + // Build name -> role map and graph + nameToRole := make(map[string]orchestrationv1alpha1.RoleSpec) + roleNames := make(map[string]bool) + for _, role := range roles { + nameToRole[role.Name] = role + roleNames[role.Name] = true + } + + // Validate dependencies exist + for _, role := range roles { + for _, dep := range role.Dependencies { + if !roleNames[dep] { + return nil, fmt.Errorf("role %q depends on non-existent role %q", role.Name, dep) + } + } + } + + // Build adjacency list and in-degree + graph := make(map[string][]string) // dep -> [dependents] + inDegree := make(map[string]int) + + for _, name := range getRoleNames(roles) { + graph[name] = []string{} + inDegree[name] = 0 + } + + for _, role := range roles { + for _, dep := range role.Dependencies { + graph[dep] = append(graph[dep], role.Name) + inDegree[role.Name]++ + } + } + + // Kahn's algorithm + queue := []string{} + for name, deg := range inDegree { + if deg == 0 { + queue = append(queue, name) + } + } + + resultNames := []string{} + for len(queue) > 0 { + cur := queue[0] + queue = queue[1:] + resultNames = append(resultNames, cur) + for _, next := range graph[cur] { + inDegree[next]-- + if inDegree[next] == 0 { + queue = append(queue, next) + } + } + } + + if len(resultNames) != len(roles) { + return nil, fmt.Errorf("circular dependency detected among roles") + } + + // Reconstruct sorted RoleSpec slice + sorted := make([]orchestrationv1alpha1.RoleSpec, len(resultNames)) + for i, name := range resultNames { + sorted[i] = nameToRole[name] + } + return sorted, nil +} + +func getRoleNames(roles []orchestrationv1alpha1.RoleSpec) []string { + names := make([]string, len(roles)) + for i, r := range roles { + names[i] = r.Name + } + return names +} + func (r *StormServiceReconciler) syncHeadlessService(ctx context.Context, service *orchestrationv1alpha1.StormService) error { expectedService := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controller/stormservice/sync_test.go b/pkg/controller/stormservice/sync_test.go index 1b0fef016..e99d05e3c 100644 --- a/pkg/controller/stormservice/sync_test.go +++ b/pkg/controller/stormservice/sync_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -279,3 +280,130 @@ func TestSyncHeadlessService(t *testing.T) { }) } } + +func TestTopologicalSortRolesFromSpec(t *testing.T) { + tests := []struct { + name string + roles []orchestrationv1alpha1.RoleSpec + expectError bool + expectOrder []string + isDeterministic bool // if true, expect exact order; otherwise only validate dependency constraints + }{ + { + name: "empty roles", + roles: []orchestrationv1alpha1.RoleSpec{}, + expectError: false, + expectOrder: []string{}, + isDeterministic: true, + }, + { + name: "no dependencies (non-deterministic)", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A"}, + {Name: "B"}, + {Name: "C"}, + }, + expectError: false, + expectOrder: nil, // don't check exact order + isDeterministic: false, + }, + { + name: "linear dependency: A → B → C", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "C", Dependencies: []string{"B"}}, + {Name: "B", Dependencies: []string{"A"}}, + {Name: "A"}, + }, + expectError: false, + expectOrder: []string{"A", "B", "C"}, + isDeterministic: true, + }, + { + name: "diamond dependency", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "D", Dependencies: []string{"B", "C"}}, + {Name: "B", Dependencies: []string{"A"}}, + {Name: "C", Dependencies: []string{"A"}}, + {Name: "A"}, + }, + expectError: false, + expectOrder: nil, + isDeterministic: false, // multiple valid orders + }, + { + name: "circular dependency: A→B→C→A", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Dependencies: []string{"C"}}, + {Name: "B", Dependencies: []string{"A"}}, + {Name: "C", Dependencies: []string{"B"}}, + }, + expectError: true, + }, + { + name: "dependency on non-existent role", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Dependencies: []string{"NonExistent"}}, + }, + expectError: true, + }, + { + name: "self dependency (circular)", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "A", Dependencies: []string{"A"}}, + }, + expectError: true, + }, + { + name: "multiple roots and leaves", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "X"}, + {Name: "Y"}, + {Name: "Z", Dependencies: []string{"X", "Y"}}, + {Name: "W", Dependencies: []string{"X"}}, + }, + expectError: false, + expectOrder: nil, + isDeterministic: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &StormServiceReconciler{} + sorted, err := r.topologicalSortRolesFromSpec(tt.roles) + + if tt.expectError { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, len(tt.roles), len(sorted)) + + // Map role name to its position in result + nameToIndex := make(map[string]int, len(sorted)) + for i, role := range sorted { + nameToIndex[role.Name] = i + } + + // Validate: every dependency must appear before its dependent + for _, role := range sorted { + for _, dep := range role.Dependencies { + depIdx, exists := nameToIndex[dep] + assert.True(t, exists, "dependency %q not found in output", dep) + curIdx := nameToIndex[role.Name] + assert.Less(t, depIdx, curIdx, "dependency %q must come before %q", dep, role.Name) + } + } + + // If deterministic, check exact order + if tt.isDeterministic { + actualNames := make([]string, len(sorted)) + for i, role := range sorted { + actualNames[i] = role.Name + } + assert.Equal(t, tt.expectOrder, actualNames) + } + }) + } +} diff --git a/test/integration/controller/roleset_test.go b/test/integration/controller/roleset_test.go index 0f22f69ae..d2a111e2f 100644 --- a/test/integration/controller/roleset_test.go +++ b/test/integration/controller/roleset_test.go @@ -45,6 +45,10 @@ const ( decodeImageVersionV2 = "decode:v2" routerImageVersionV1 = "router:v1" routerImageVersionV2 = "router:v2" + ingressRoleName = "ingress" + decodeRoleName = "decode" + prefillRoleName = "prefill" + postprocessRoleName = "postprocess" ) var _ = ginkgo.Describe("RoleSet controller test", func() { @@ -425,5 +429,144 @@ var _ = ginkgo.Describe("RoleSet controller test", func() { }, }, ), + ginkgo.Entry("respect role dependencies during scale-up", + &testValidatingCase{ + makeRoleSet: func() *orchestrationapi.RoleSet { + int32Ptr := func(i int32) *int32 { return &i } + // ingress: no deps, starts first + ingressRole := orchestrationapi.RoleSpec{ + Name: ingressRoleName, + Replicas: int32Ptr(1), + Template: validation.MakePodTemplate("nginx:alpine"), + // No Dependencies → starts immediately + } + + // decode: depends on ingress + decodeRole := orchestrationapi.RoleSpec{ + Name: decodeRoleName, + Replicas: int32Ptr(2), + Dependencies: []string{ingressRoleName}, + Template: validation.MakePodTemplate("ghcr.io/llm-d/llm-d-inference-sim:latest"), + } + + // prefill: depends on decode (intentionally reversed) + prefillRole := orchestrationapi.RoleSpec{ + Name: prefillRoleName, + Replicas: int32Ptr(2), + Dependencies: []string{decodeRoleName}, + Template: validation.MakePodTemplate("ghcr.io/llm-d/llm-d-inference-sim:latest"), + } + + // postprocess: depends on both + postprocessRole := orchestrationapi.RoleSpec{ + Name: postprocessRoleName, + Replicas: int32Ptr(2), + Dependencies: []string{prefillRoleName, decodeRoleName}, + Template: validation.MakePodTemplate("alpine:latest"), + } + + return wrapper.MakeRoleSet("dependency-test"). + Namespace(ns.Name). + UpdateStrategy(orchestrationapi.SequentialRoleSetStrategyType). + WithRoleAdvanced(ingressRole). + WithRoleAdvanced(decodeRole). + WithRoleAdvanced(prefillRole). + WithRoleAdvanced(postprocessRole). + Obj() + }, + updates: []*update{ + // Stage 1: Create RoleSet → only ingress pods appear + { + updateFunc: func(rs *orchestrationapi.RoleSet) { + gomega.Expect(k8sClient.Create(ctx, rs)).To(gomega.Succeed()) + + // Only ingress (1 pod) should exist initially + validation.WaitForPodsCreated(ctx, k8sClient, ns.Name, + constants.RoleSetNameLabelKey, rs.Name, 1) + + }, + checkFunc: func(ctx context.Context, k8sClient client.Client, rs *orchestrationapi.RoleSet) { + // Mark current pods (ingress) as ready + validation.MarkPodsReady(ctx, k8sClient, ns.Name, constants.RoleNameLabelKey, ingressRoleName) + // Verify one roles + gomega.Eventually(func(g gomega.Gomega) { + latest := &orchestrationapi.RoleSet{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rs), latest)).To(gomega.Succeed()) + g.Expect(validation.ValidateRoleStatus(latest, ingressRoleName, 1)).To(gomega.Succeed()) + }, time.Second*10).Should(gomega.Succeed()) + }, + }, + // Stage 2: decode pods should now be created (2 pods) + { + updateFunc: func(rs *orchestrationapi.RoleSet) { + // Total pods: 1 (ingress) + 2 (decode) = 3 + validation.WaitForPodsCreated(ctx, k8sClient, ns.Name, + constants.RoleSetNameLabelKey, rs.Name, 3) + }, + checkFunc: func(ctx context.Context, k8sClient client.Client, rs *orchestrationapi.RoleSet) { + // Mark all decode pods as ready + validation.MarkPodsReady(ctx, k8sClient, ns.Name, constants.RoleNameLabelKey, decodeRoleName) + // Verify two roles + gomega.Eventually(func(g gomega.Gomega) { + latest := &orchestrationapi.RoleSet{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rs), latest)).To(gomega.Succeed()) + g.Expect(validation.ValidateRoleStatus(latest, ingressRoleName, 1)).To(gomega.Succeed()) + g.Expect(validation.ValidateRoleStatus(latest, decodeRoleName, 2)).To(gomega.Succeed()) + }, time.Second*10).Should(gomega.Succeed()) + }, + }, + // Stage 3: prefill pods created (2 more → total 5) + { + updateFunc: func(rs *orchestrationapi.RoleSet) { + validation.WaitForPodsCreated(ctx, k8sClient, ns.Name, + constants.RoleSetNameLabelKey, rs.Name, 5) + }, + checkFunc: func(ctx context.Context, k8sClient client.Client, rs *orchestrationapi.RoleSet) { + // Mark prefill pods as ready + validation.MarkPodsReady(ctx, k8sClient, ns.Name, constants.RoleNameLabelKey, prefillRoleName) + // Verify three roles + gomega.Eventually(func(g gomega.Gomega) { + latest := &orchestrationapi.RoleSet{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rs), latest)).To(gomega.Succeed()) + g.Expect(validation.ValidateRoleStatus(latest, ingressRoleName, 1)).To(gomega.Succeed()) + g.Expect(validation.ValidateRoleStatus(latest, decodeRoleName, 2)).To(gomega.Succeed()) + g.Expect(validation.ValidateRoleStatus(latest, prefillRoleName, 2)).To(gomega.Succeed()) + }, time.Second*10).Should(gomega.Succeed()) + }, + }, + // Stage 4: postprocess pods created (2 more → total 7) + { + updateFunc: func(rs *orchestrationapi.RoleSet) { + validation.WaitForPodsCreated(ctx, k8sClient, ns.Name, + constants.RoleSetNameLabelKey, rs.Name, 7) + }, + checkFunc: func(ctx context.Context, k8sClient client.Client, rs *orchestrationapi.RoleSet) { + // Final mark + validation.MarkPodsReady(ctx, k8sClient, ns.Name, constants.RoleNameLabelKey, postprocessRoleName) + // Validate final status + gomega.Eventually(func() error { + latest := &orchestrationapi.RoleSet{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(rs), latest); err != nil { + return err + } + if err := validation.ValidateRoleStatus(latest, ingressRoleName, 1); err != nil { + return err + } + if err := validation.ValidateRoleStatus(latest, decodeRoleName, 2); err != nil { + return err + } + if err := validation.ValidateRoleStatus(latest, prefillRoleName, 2); err != nil { + return err + } + if err := validation.ValidateRoleStatus(latest, postprocessRoleName, 2); err != nil { + return err + } + return nil + }, time.Second*30).Should(gomega.Succeed()) + }, + }, + }, + }, + ), ) }) diff --git a/test/integration/controller/stormservice_test.go b/test/integration/controller/stormservice_test.go index 7839ad01c..c08df3243 100644 --- a/test/integration/controller/stormservice_test.go +++ b/test/integration/controller/stormservice_test.go @@ -33,6 +33,11 @@ import ( "github.com/vllm-project/aibrix/test/utils/wrapper" ) +const ( + headRoleName = "head" + workerRoleName = "worker" +) + var _ = ginkgo.Describe("StormService controller test", func() { var ns *corev1.Namespace @@ -189,6 +194,127 @@ var _ = ginkgo.Describe("StormService controller test", func() { }, }, ), + ginkgo.Entry("respect role dependencies in StormService with multiple replicas", + &testValidatingCase{ + makeStormService: func() *orchestrationapi.StormService { + matchLabel := map[string]string{"app": "storm-deps-test"} + podTemplate := corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: matchLabel, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "dummy", Image: "alpine:latest", Command: []string{"sleep", "3600"}}, + }, + }, + } + int32Ptr := func(i int32) *int32 { return &i } + + roleSetSpec := &orchestrationapi.RoleSetSpec{ + Roles: []orchestrationapi.RoleSpec{ + { + Name: headRoleName, + Replicas: int32Ptr(1), + Template: podTemplate, + // No dependencies + }, + { + Name: workerRoleName, + Replicas: int32Ptr(2), + Dependencies: []string{headRoleName}, + Template: podTemplate, + }, + }, + UpdateStrategy: orchestrationapi.SequentialRoleSetStrategyType, + } + + return wrapper.MakeStormService("stormservice-with-deps"). + Namespace(ns.Name). + Replicas(ptr.To(int32(2))). // ← 2 RoleSets + Selector(metav1.SetAsLabelSelector(matchLabel)). + UpdateStrategyType(orchestrationapi.InPlaceUpdateStormServiceStrategyType). + RoleSetTemplateMeta(metav1.ObjectMeta{Labels: matchLabel}, roleSetSpec). + Obj() + }, + updates: []*update{ + { + updateFunc: func(ss *orchestrationapi.StormService) { + gomega.Expect(k8sClient.Create(ctx, ss)).To(gomega.Succeed()) + + // Wait for 2 RoleSets to be created + validation.WaitForRoleSetsCreated(ctx, k8sClient, ns.Name, ss.Name, 2) + + // Initially, only head pods exist: 2 RoleSets × 1 head = 2 pods + validation.WaitForPodsCreated(ctx, k8sClient, ns.Name, + constants.StormServiceNameLabelKey, ss.Name, 2) + + // Mark ALL head pods as ready → unblock worker creation in both RoleSets + validation.MarkPodsReady(ctx, k8sClient, ns.Name, + constants.RoleNameLabelKey, headRoleName) + }, + checkFunc: func(ctx context.Context, k8sClient client.Client, ss *orchestrationapi.StormService) { + // Validate: only 'head' roles exist across all RoleSets + gomega.Eventually(func(g gomega.Gomega) { + roleSets := &orchestrationapi.RoleSetList{} + g.Expect(k8sClient.List(ctx, roleSets, + client.InNamespace(ns.Name), + client.MatchingLabels{constants.StormServiceNameLabelKey: ss.Name})). + To(gomega.Succeed()) + + g.Expect(roleSets.Items).To(gomega.HaveLen(2)) + for _, rs := range roleSets.Items { + // Each RoleSet should have head=1, worker=0 (not created yet) + g.Expect(validation.ValidateRoleStatus(&rs, headRoleName, 1)).To(gomega.Succeed()) + } + }, time.Second*10).Should(gomega.Succeed()) + + }, + }, + { + updateFunc: func(ss *orchestrationapi.StormService) { + // After marking head ready, workers should be created + // Total pods = 2 RoleSets × (1 head + 2 worker) = 6 + validation.WaitForPodsCreated(ctx, k8sClient, ns.Name, + constants.StormServiceNameLabelKey, ss.Name, 6) + + // Mark worker pods ready (optional, but completes lifecycle) + validation.MarkPodsReady(ctx, k8sClient, ns.Name, + constants.RoleNameLabelKey, workerRoleName) + }, + checkFunc: func(ctx context.Context, k8sClient client.Client, ss *orchestrationapi.StormService) { + // Validate final RoleStatuses in StormService + gomega.Eventually(func(g gomega.Gomega) { + latest := &orchestrationapi.StormService{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(ss), latest)).To(gomega.Succeed()) + + // Aggregate role statuses: head=2, worker=4 + foundHead := false + foundWorker := false + for _, rs := range latest.Status.RoleStatuses { + if rs.Name == headRoleName { + g.Expect(rs.Replicas).To(gomega.Equal(int32(2))) + g.Expect(rs.ReadyReplicas).To(gomega.Equal(int32(2))) + foundHead = true + } + if rs.Name == workerRoleName { + g.Expect(rs.Replicas).To(gomega.Equal(int32(4))) + g.Expect(rs.ReadyReplicas).To(gomega.Equal(int32(4))) + foundWorker = true + } + } + g.Expect(foundHead).To(gomega.BeTrue()) + g.Expect(foundWorker).To(gomega.BeTrue()) + + // Validate top-level status + g.Expect(latest.Status.Replicas).To(gomega.Equal(int32(2))) // 2 RoleSets + g.Expect(latest.Status.ReadyReplicas).To(gomega.Equal(int32(2))) // both ready + }, time.Second*15).Should(gomega.Succeed()) + + }, + }, + }, + }, + ), // TODO: add more test cases for different update strategies, stateful services, etc. ) })