Skip to content

Commit e7f3694

Browse files
committed
feat(roles): add support for role dependencies to enforce startup order
Signed-off-by: CYJiang <googs1025@gmail.com>
1 parent b67d47a commit e7f3694

File tree

13 files changed

+544
-25
lines changed

13 files changed

+544
-25
lines changed

api/orchestration/v1alpha1/roleset_types.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,22 @@ type RoleSpec struct {
150150
// +optional
151151
Replicas *int32 `json:"replicas,omitempty"`
152152

153+
// Dependencies specifies the list of role names that must be ready before this role starts.
154+
// This is used to define a dependency graph between different roles, ensuring that certain
155+
// roles only start once their dependencies have successfully started and are ready.
156+
// Each element in the slice should correspond to the name of another role defined within
157+
// the same StormService specification.
158+
//
159+
// For example, if RoleA depends on RoleB and RoleC, then RoleA's Dependencies would look like:
160+
// Dependencies: []string{"RoleB", "RoleC"}
161+
//
162+
// The system will ensure that RoleA does not begin its startup process until both RoleB and
163+
// RoleC have been marked as ready. If any specified dependency fails to start or becomes not
164+
// ready, RoleA's startup (or restart) will be delayed until all dependencies are satisfied.
165+
//
166+
// +optional
167+
Dependencies []string `json:"dependencies,omitempty"`
168+
153169
// UpgradeOrder specifies the order in which this role should be upgraded.
154170
// Lower values are upgraded first. If not specified, roles upgrade after all explicitly ordered roles.
155171
// +optional

api/orchestration/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ spec:
3838
roles:
3939
items:
4040
properties:
41+
dependencies:
42+
items:
43+
type: string
44+
type: array
4145
disruptionTolerance:
4246
properties:
4347
maxUnavailable:

config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ spec:
113113
roles:
114114
items:
115115
properties:
116+
dependencies:
117+
items:
118+
type: string
119+
type: array
116120
disruptionTolerance:
117121
properties:
118122
maxUnavailable:

dist/chart/crds/orchestration.aibrix.ai_rolesets.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ spec:
3838
roles:
3939
items:
4040
properties:
41+
dependencies:
42+
items:
43+
type: string
44+
type: array
4145
disruptionTolerance:
4246
properties:
4347
maxUnavailable:

dist/chart/crds/orchestration.aibrix.ai_stormservices.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ spec:
113113
roles:
114114
items:
115115
properties:
116+
dependencies:
117+
items:
118+
type: string
119+
type: array
116120
disruptionTolerance:
117121
properties:
118122
maxUnavailable:

pkg/client/applyconfiguration/orchestration/v1alpha1/rolespec.go

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller/roleset/rolling.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ import (
2222
"math"
2323
"strings"
2424

25-
orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
26-
2725
"k8s.io/klog/v2"
28-
2926
"sigs.k8s.io/controller-runtime/pkg/client"
27+
28+
orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
3029
)
3130

3231
type RollingManager interface {
@@ -38,22 +37,35 @@ type RollingManagerSequential struct {
3837
}
3938

4039
func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
41-
// 1. ensure pod replica meet expectations
40+
// 1. Find the furthest role whose dependencies are ready
41+
progressIndex := -1
42+
for i, role := range roleSet.Spec.Roles {
43+
if !isRoleDependenciesReady(roleSet, &role) {
44+
break
45+
}
46+
progressIndex = i
47+
}
48+
49+
if progressIndex == -1 {
50+
return nil // even A is blocked (shouldn't happen)
51+
}
52+
53+
// 2. Scale all roles from 0 to progressIndex
4254
var scaling bool
43-
for _, role := range roleSet.Spec.Roles {
44-
klog.Infof("[RollingManagerSequential.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
45-
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
55+
for i := 0; i <= progressIndex; i++ {
56+
role := &roleSet.Spec.Roles[i]
57+
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
4658
if err != nil {
4759
return err
4860
}
4961
scaling = scaling || s
5062
}
5163
if scaling {
52-
klog.Infof("[RollingManagerSequential.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
64+
// Wait for Pods to be created and status updated
5365
return nil
5466
}
5567

56-
// 2. Sort roles by upgrade order
68+
// 3. Sort roles by upgrade order
5769
klog.Infof("[RollingManagerSequential.Next] sorting roleset roles by UpgradeOrder")
5870
sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles)
5971
var sequenceLines []string
@@ -72,7 +84,7 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat
7284
roleSet.Name,
7385
strings.Join(sequenceLines, "\n"))
7486

75-
// 3. do the rollout process for each role by order
87+
// 4. do the rollout process for each role by order
7688
for _, role := range sortedRoles {
7789
klog.Infof("[RollingManagerSequential.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
7890
err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role)
@@ -95,29 +107,43 @@ type RollingManagerParallel struct {
95107
}
96108

97109
func (m *RollingManagerParallel) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
98-
// 1. ensure pod replica meet expectations
110+
// 1. Find the furthest role whose dependencies are ready
111+
progressIndex := -1
112+
for i, role := range roleSet.Spec.Roles {
113+
if !isRoleDependenciesReady(roleSet, &role) {
114+
break
115+
}
116+
progressIndex = i
117+
}
118+
119+
if progressIndex == -1 {
120+
return nil // even A is blocked (shouldn't happen)
121+
}
122+
123+
// 2: Scale all roles from 0 to progressIndex
99124
var scaling bool
100-
for _, role := range roleSet.Spec.Roles {
101-
klog.Infof("[RollingManagerParallel.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
102-
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
125+
for i := 0; i <= progressIndex; i++ {
126+
role := &roleSet.Spec.Roles[i]
127+
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
103128
if err != nil {
104129
return err
105130
}
106131
scaling = scaling || s
107132
}
108133
if scaling {
109-
klog.Infof("[RollingManagerParallel.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
110-
return
134+
// Wait for Pods to be created and status updated
135+
return nil
111136
}
112-
// 2. do the rollout process for each role
137+
138+
// 3. do the rollout process for each role
113139
for _, role := range roleSet.Spec.Roles {
114140
klog.Infof("[RollingManagerParallel.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
115141
err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role)
116142
if err != nil {
117143
return err
118144
}
119145
}
120-
return
146+
return nil
121147
}
122148

123149
type RollingManagerInterleave struct {
@@ -127,21 +153,34 @@ type RollingManagerInterleave struct {
127153
// Interleaved rollout: update roles in alternating steps,
128154
// using (maxSurge + maxUnavailable) as the step size for all roles
129155
func (m *RollingManagerInterleave) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
130-
// 1. ensure pod replica meet expectations
156+
// 1. Find the furthest role whose dependencies are ready
157+
progressIndex := -1
158+
for i, role := range roleSet.Spec.Roles {
159+
if !isRoleDependenciesReady(roleSet, &role) {
160+
break
161+
}
162+
progressIndex = i
163+
}
164+
165+
if progressIndex == -1 {
166+
return nil // even A is blocked (shouldn't happen)
167+
}
168+
169+
// 2. Scale all roles from 0 to progressIndex
131170
var scaling bool
132-
for _, role := range roleSet.Spec.Roles {
133-
klog.Infof("[RollingManagerInterleave.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
134-
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
171+
for i := 0; i <= progressIndex; i++ {
172+
role := &roleSet.Spec.Roles[i]
173+
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
135174
if err != nil {
136175
return err
137176
}
138177
scaling = scaling || s
139178
}
140179
if scaling {
141-
klog.Infof("[RollingManagerInterleave.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
180+
// Wait for Pods to be created and status updated
142181
return nil
143182
}
144-
// 2. do the rollout process for each role
183+
// 3. do the rollout process for each role
145184
roleSteps := make(map[string]int32)
146185
roleReadyStatus := make(map[string]bool)
147186
currentStep := int32(math.MaxInt32)

pkg/controller/roleset/utils.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,3 +436,35 @@ func sortRolesByUpgradeOrder(roles []orchestrationv1alpha1.RoleSpec) []orchestra
436436
})
437437
return sortedRoles
438438
}
439+
440+
func isRoleDependenciesReady(roleSet *orchestrationv1alpha1.RoleSet, role *orchestrationv1alpha1.RoleSpec) bool {
441+
if len(role.Dependencies) == 0 {
442+
return true
443+
}
444+
445+
// Initialize all roles to 0
446+
roleStatusMap := make(map[string]int32)
447+
for _, r := range roleSet.Spec.Roles {
448+
roleStatusMap[r.Name] = 0
449+
}
450+
for _, rs := range roleSet.Status.Roles {
451+
roleStatusMap[rs.Name] = rs.ReadyReplicas
452+
}
453+
454+
for _, depName := range role.Dependencies {
455+
depReady := roleStatusMap[depName]
456+
expected := int32(1)
457+
for _, r := range roleSet.Spec.Roles {
458+
if r.Name == depName && r.Replicas != nil {
459+
expected = *r.Replicas
460+
break
461+
}
462+
}
463+
if depReady < expected {
464+
klog.V(4).Infof("Role %s depends on %s, "+
465+
"but only %d/%d ready", role.Name, depName, depReady, expected)
466+
return false
467+
}
468+
}
469+
return true
470+
}

0 commit comments

Comments
 (0)