Skip to content

Commit 972ffcc

Browse files
committed
feat(roles): add support for role dependencies to enforce startup order
Signed-off-by: CYJiang <googs1025@gmail.com>
1 parent 981c739 commit 972ffcc

File tree

15 files changed

+835
-25
lines changed

15 files changed

+835
-25
lines changed

api/orchestration/v1alpha1/roleset_types.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,30 @@ type RoleSpec struct {
150150
// +optional
151151
Replicas *int32 `json:"replicas,omitempty"`
152152

153+
// Dependencies specifies the list of role names that must be ready before this role starts or is updated.
154+
// This is used to define a dependency graph between different roles, ensuring correct ordering during
155+
// both initial scale-up and rolling updates. Each element in the slice should correspond to the name
156+
// of another role defined within the same StormService specification.
157+
//
158+
// For example, if RoleA depends on RoleB and RoleC, then RoleA's Dependencies would look like:
159+
// Dependencies: []string{"RoleB", "RoleC"}
160+
//
161+
// - **During scale-up (initial startup or scaling from zero):**
162+
// RoleA will not begin creating Pods until all its dependencies (e.g., RoleB and RoleC) have reached
163+
// their desired number of ready replicas.
164+
//
165+
// - **During rolling updates:**
166+
// RoleA will not proceed to update its existing Pods if any of its dependencies are not currently ready.
167+
// This includes cases where a dependency is failing, has zero ready replicas, or is itself undergoing an update.
168+
// Note: Controller **does not automatically trigger updates** on dependent roles (e.g., RoleB or RoleC).
169+
// It only checks whether they are currently ready. You must explicitly update dependencies if needed.
170+
//
171+
// In both cases, if a dependency becomes unready at any time, RoleA’s scale-up or rollout will be paused
172+
// until all dependencies are satisfied again.
173+
//
174+
// +optional
175+
Dependencies []string `json:"dependencies,omitempty"`
176+
153177
// UpgradeOrder specifies the order in which this role should be upgraded.
154178
// Lower values are upgraded first. If not specified, roles upgrade after all explicitly ordered roles.
155179
// +optional

api/orchestration/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ spec:
3838
roles:
3939
items:
4040
properties:
41+
dependencies:
42+
items:
43+
type: string
44+
type: array
4145
disruptionTolerance:
4246
properties:
4347
maxUnavailable:

config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ spec:
113113
roles:
114114
items:
115115
properties:
116+
dependencies:
117+
items:
118+
type: string
119+
type: array
116120
disruptionTolerance:
117121
properties:
118122
maxUnavailable:

dist/chart/crds/orchestration.aibrix.ai_rolesets.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ spec:
3838
roles:
3939
items:
4040
properties:
41+
dependencies:
42+
items:
43+
type: string
44+
type: array
4145
disruptionTolerance:
4246
properties:
4347
maxUnavailable:

dist/chart/crds/orchestration.aibrix.ai_stormservices.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ spec:
113113
roles:
114114
items:
115115
properties:
116+
dependencies:
117+
items:
118+
type: string
119+
type: array
116120
disruptionTolerance:
117121
properties:
118122
maxUnavailable:

pkg/client/applyconfiguration/orchestration/v1alpha1/rolespec.go

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller/roleset/rolling.go

Lines changed: 77 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ import (
2222
"math"
2323
"strings"
2424

25-
orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
26-
2725
"k8s.io/klog/v2"
28-
2926
"sigs.k8s.io/controller-runtime/pkg/client"
27+
28+
orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
3029
)
3130

3231
type RollingManager interface {
@@ -38,22 +37,35 @@ type RollingManagerSequential struct {
3837
}
3938

4039
func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
41-
// 1. ensure pod replica meet expectations
40+
// 1. Find the furthest role whose dependencies are ready
41+
progressIndex := -1
42+
for i, role := range roleSet.Spec.Roles {
43+
if !isRoleDependenciesReady(roleSet, &role) {
44+
break
45+
}
46+
progressIndex = i
47+
}
48+
49+
if progressIndex == -1 {
50+
return nil // even A is blocked (shouldn't happen)
51+
}
52+
53+
// 2. Scale all roles from 0 to progressIndex
4254
var scaling bool
43-
for _, role := range roleSet.Spec.Roles {
44-
klog.Infof("[RollingManagerSequential.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
45-
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
55+
for i := 0; i <= progressIndex; i++ {
56+
role := &roleSet.Spec.Roles[i]
57+
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
4658
if err != nil {
4759
return err
4860
}
4961
scaling = scaling || s
5062
}
5163
if scaling {
52-
klog.Infof("[RollingManagerSequential.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
64+
// Wait for Pods to be created and status updated
5365
return nil
5466
}
5567

56-
// 2. Sort roles by upgrade order
68+
// 3. Sort roles by upgrade order
5769
klog.Infof("[RollingManagerSequential.Next] sorting roleset roles by UpgradeOrder")
5870
sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles)
5971
var sequenceLines []string
@@ -72,8 +84,12 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat
7284
roleSet.Name,
7385
strings.Join(sequenceLines, "\n"))
7486

75-
// 3. do the rollout process for each role by order
87+
// 4. do the rollout process for each role by order
7688
for _, role := range sortedRoles {
89+
if !isRoleDependenciesReady(roleSet, &role) {
90+
klog.Infof("Rollout of role %s blocked: dependencies not ready. Stopping sequential update.", role.Name)
91+
break // enforce strict sequential order
92+
}
7793
klog.Infof("[RollingManagerSequential.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
7894
err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role)
7995
if err != nil {
@@ -95,29 +111,48 @@ type RollingManagerParallel struct {
95111
}
96112

97113
func (m *RollingManagerParallel) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
98-
// 1. ensure pod replica meet expectations
114+
// 1. Find the furthest role whose dependencies are ready
115+
progressIndex := -1
116+
for i, role := range roleSet.Spec.Roles {
117+
if !isRoleDependenciesReady(roleSet, &role) {
118+
break
119+
}
120+
progressIndex = i
121+
}
122+
123+
if progressIndex == -1 {
124+
return nil // even A is blocked (shouldn't happen)
125+
}
126+
127+
// 2: Scale all roles from 0 to progressIndex
99128
var scaling bool
100-
for _, role := range roleSet.Spec.Roles {
101-
klog.Infof("[RollingManagerParallel.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
102-
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
129+
for i := 0; i <= progressIndex; i++ {
130+
role := &roleSet.Spec.Roles[i]
131+
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
103132
if err != nil {
104133
return err
105134
}
106135
scaling = scaling || s
107136
}
108137
if scaling {
109-
klog.Infof("[RollingManagerParallel.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
110-
return
138+
// Wait for Pods to be created and status updated
139+
return nil
111140
}
112-
// 2. do the rollout process for each role
141+
142+
// 3. do the rollout process for each role
113143
for _, role := range roleSet.Spec.Roles {
144+
if !isRoleDependenciesReady(roleSet, &role) {
145+
klog.Infof("Skipping rollout for role %s: "+
146+
"dependencies not ready (parallel update continues for other roles)", role.Name)
147+
continue // parallel mode, skip only this role
148+
}
114149
klog.Infof("[RollingManagerParallel.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
115150
err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role)
116151
if err != nil {
117152
return err
118153
}
119154
}
120-
return
155+
return nil
121156
}
122157

123158
type RollingManagerInterleave struct {
@@ -127,28 +162,46 @@ type RollingManagerInterleave struct {
127162
// Interleaved rollout: update roles in alternating steps,
128163
// using (maxSurge + maxUnavailable) as the step size for all roles
129164
func (m *RollingManagerInterleave) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
130-
// 1. ensure pod replica meet expectations
165+
// 1. Find the furthest role whose dependencies are ready
166+
progressIndex := -1
167+
for i, role := range roleSet.Spec.Roles {
168+
if !isRoleDependenciesReady(roleSet, &role) {
169+
break
170+
}
171+
progressIndex = i
172+
}
173+
174+
if progressIndex == -1 {
175+
return nil // even A is blocked (shouldn't happen)
176+
}
177+
178+
// 2. Scale all roles from 0 to progressIndex
131179
var scaling bool
132-
for _, role := range roleSet.Spec.Roles {
133-
klog.Infof("[RollingManagerInterleave.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
134-
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
180+
for i := 0; i <= progressIndex; i++ {
181+
role := &roleSet.Spec.Roles[i]
182+
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
135183
if err != nil {
136184
return err
137185
}
138186
scaling = scaling || s
139187
}
140188
if scaling {
141-
klog.Infof("[RollingManagerInterleave.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
189+
// Wait for Pods to be created and status updated
142190
return nil
143191
}
144-
// 2. do the rollout process for each role
192+
// 3. do the rollout process for each role
145193
roleSteps := make(map[string]int32)
146194
roleReadyStatus := make(map[string]bool)
147195
currentStep := int32(math.MaxInt32)
148196
allRolesReady := true
149197

150198
// Check the current step for each role and determine the minimum step across all roles as the global current step
151199
for _, role := range roleSet.Spec.Roles {
200+
if !isRoleDependenciesReady(roleSet, &role) {
201+
klog.Infof("Skipping role %s/%s: dependencies not ready (will retry in next reconcile)",
202+
roleSet.Namespace, role.Name)
203+
continue // interleaved mode: skip only this role, others may proceed
204+
}
152205
allReady, currentRoleStep, err := GetRoleSyncer(m.cli, &role).CheckCurrentStep(ctx, roleSet, &role)
153206
if err != nil {
154207
klog.Errorf("[RollingManagerInterleave.Next] Failed to get current step for role %s in roleset %s/%s: %v", role.Name, roleSet.Namespace, roleSet.Name, err)

pkg/controller/roleset/utils.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,3 +436,35 @@ func sortRolesByUpgradeOrder(roles []orchestrationv1alpha1.RoleSpec) []orchestra
436436
})
437437
return sortedRoles
438438
}
439+
440+
func isRoleDependenciesReady(roleSet *orchestrationv1alpha1.RoleSet, role *orchestrationv1alpha1.RoleSpec) bool {
441+
if len(role.Dependencies) == 0 {
442+
return true
443+
}
444+
445+
// Initialize all roles to 0
446+
roleStatusMap := make(map[string]int32)
447+
for _, r := range roleSet.Spec.Roles {
448+
roleStatusMap[r.Name] = 0
449+
}
450+
for _, rs := range roleSet.Status.Roles {
451+
roleStatusMap[rs.Name] = rs.ReadyReplicas
452+
}
453+
454+
for _, depName := range role.Dependencies {
455+
depReady := roleStatusMap[depName]
456+
expected := int32(1)
457+
for _, r := range roleSet.Spec.Roles {
458+
if r.Name == depName && r.Replicas != nil {
459+
expected = *r.Replicas
460+
break
461+
}
462+
}
463+
if depReady < expected {
464+
klog.V(4).Infof("Role %s depends on %s, "+
465+
"but only %d/%d ready", role.Name, depName, depReady, expected)
466+
return false
467+
}
468+
}
469+
return true
470+
}

0 commit comments

Comments
 (0)