@@ -22,11 +22,10 @@ import (
2222 "math"
2323 "strings"
2424
25- orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
26-
2725 "k8s.io/klog/v2"
28-
2926 "sigs.k8s.io/controller-runtime/pkg/client"
27+
28+ orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
3029)
3130
3231type RollingManager interface {
@@ -38,22 +37,35 @@ type RollingManagerSequential struct {
3837}
3938
4039func (m * RollingManagerSequential ) Next (ctx context.Context , roleSet * orchestrationv1alpha1.RoleSet ) (err error ) {
41- // 1. ensure pod replica meet expectations
40+ // 1. Find the furthest role whose dependencies are ready
41+ progressIndex := - 1
42+ for i , role := range roleSet .Spec .Roles {
43+ if ! isRoleDependenciesReady (roleSet , & role ) {
44+ break
45+ }
46+ progressIndex = i
47+ }
48+
49+ if progressIndex == - 1 {
50+ return nil // even A is blocked (shouldn't happen)
51+ }
52+
53+ // 2. Scale all roles from 0 to progressIndex
4254 var scaling bool
43- for _ , role := range roleSet . Spec . Roles {
44- klog . Infof ( "[RollingManagerSequential.Next] start to scale roleset %s/%s role %s" , roleSet .Namespace , roleSet . Name , role . Name )
45- s , err := GetRoleSyncer (m .cli , & role ).Scale (ctx , roleSet , & role )
55+ for i := 0 ; i <= progressIndex ; i ++ {
56+ role := & roleSet .Spec . Roles [ i ]
57+ s , err := GetRoleSyncer (m .cli , role ).Scale (ctx , roleSet , role )
4658 if err != nil {
4759 return err
4860 }
4961 scaling = scaling || s
5062 }
5163 if scaling {
52- klog . Infof ( "[RollingManagerSequential.Next] waiting for roleset %s/%s to be scaled" , roleSet . Namespace , roleSet . Name )
64+ // Wait for Pods to be created and status updated
5365 return nil
5466 }
5567
56- // 2 . Sort roles by upgrade order
68+ // 3 . Sort roles by upgrade order
5769 klog .Infof ("[RollingManagerSequential.Next] sorting roleset roles by UpgradeOrder" )
5870 sortedRoles := sortRolesByUpgradeOrder (roleSet .Spec .Roles )
5971 var sequenceLines []string
@@ -72,8 +84,12 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat
7284 roleSet .Name ,
7385 strings .Join (sequenceLines , "\n " ))
7486
75- // 3 . do the rollout process for each role by order
87+ // 4 . do the rollout process for each role by order
7688 for _ , role := range sortedRoles {
89+ if ! isRoleDependenciesReady (roleSet , & role ) {
90+ klog .Infof ("Rollout of role %s blocked: dependencies not ready. Stopping sequential update." , role .Name )
91+ break // enforce strict sequential order
92+ }
7793 klog .Infof ("[RollingManagerSequential.Next] start to rollout roleset %s/%s role %s" , roleSet .Namespace , roleSet .Name , role .Name )
7894 err := GetRoleSyncer (m .cli , & role ).Rollout (ctx , roleSet , & role )
7995 if err != nil {
@@ -95,29 +111,48 @@ type RollingManagerParallel struct {
95111}
96112
97113func (m * RollingManagerParallel ) Next (ctx context.Context , roleSet * orchestrationv1alpha1.RoleSet ) (err error ) {
98- // 1. ensure pod replica meet expectations
114+ // 1. Find the furthest role whose dependencies are ready
115+ progressIndex := - 1
116+ for i , role := range roleSet .Spec .Roles {
117+ if ! isRoleDependenciesReady (roleSet , & role ) {
118+ break
119+ }
120+ progressIndex = i
121+ }
122+
123+ if progressIndex == - 1 {
124+ return nil // even A is blocked (shouldn't happen)
125+ }
126+
127+ // 2: Scale all roles from 0 to progressIndex
99128 var scaling bool
100- for _ , role := range roleSet . Spec . Roles {
101- klog . Infof ( "[RollingManagerParallel.Next] start to scale roleset %s/%s role %s" , roleSet .Namespace , roleSet . Name , role . Name )
102- s , err := GetRoleSyncer (m .cli , & role ).Scale (ctx , roleSet , & role )
129+ for i := 0 ; i <= progressIndex ; i ++ {
130+ role := & roleSet .Spec . Roles [ i ]
131+ s , err := GetRoleSyncer (m .cli , role ).Scale (ctx , roleSet , role )
103132 if err != nil {
104133 return err
105134 }
106135 scaling = scaling || s
107136 }
108137 if scaling {
109- klog . Infof ( "[RollingManagerParallel.Next] waiting for roleset %s/%s to be scaled" , roleSet . Namespace , roleSet . Name )
110- return
138+ // Wait for Pods to be created and status updated
139+ return nil
111140 }
112- // 2. do the rollout process for each role
141+
142+ // 3. do the rollout process for each role
113143 for _ , role := range roleSet .Spec .Roles {
144+ if ! isRoleDependenciesReady (roleSet , & role ) {
145+ klog .Infof ("Skipping rollout for role %s: " +
146+ "dependencies not ready (parallel update continues for other roles)" , role .Name )
147+ continue // parallel mode, skip only this role
148+ }
114149 klog .Infof ("[RollingManagerParallel.Next] start to rollout roleset %s/%s role %s" , roleSet .Namespace , roleSet .Name , role .Name )
115150 err := GetRoleSyncer (m .cli , & role ).Rollout (ctx , roleSet , & role )
116151 if err != nil {
117152 return err
118153 }
119154 }
120- return
155+ return nil
121156}
122157
123158type RollingManagerInterleave struct {
@@ -127,28 +162,46 @@ type RollingManagerInterleave struct {
127162// Interleaved rollout: update roles in alternating steps,
128163// using (maxSurge + maxUnavailable) as the step size for all roles
129164func (m * RollingManagerInterleave ) Next (ctx context.Context , roleSet * orchestrationv1alpha1.RoleSet ) (err error ) {
130- // 1. ensure pod replica meet expectations
165+ // 1. Find the furthest role whose dependencies are ready
166+ progressIndex := - 1
167+ for i , role := range roleSet .Spec .Roles {
168+ if ! isRoleDependenciesReady (roleSet , & role ) {
169+ break
170+ }
171+ progressIndex = i
172+ }
173+
174+ if progressIndex == - 1 {
175+ return nil // even A is blocked (shouldn't happen)
176+ }
177+
178+ // 2. Scale all roles from 0 to progressIndex
131179 var scaling bool
132- for _ , role := range roleSet . Spec . Roles {
133- klog . Infof ( "[RollingManagerInterleave.Next] start to scale roleset %s/%s role %s" , roleSet .Namespace , roleSet . Name , role . Name )
134- s , err := GetRoleSyncer (m .cli , & role ).Scale (ctx , roleSet , & role )
180+ for i := 0 ; i <= progressIndex ; i ++ {
181+ role := & roleSet .Spec . Roles [ i ]
182+ s , err := GetRoleSyncer (m .cli , role ).Scale (ctx , roleSet , role )
135183 if err != nil {
136184 return err
137185 }
138186 scaling = scaling || s
139187 }
140188 if scaling {
141- klog . Infof ( "[RollingManagerInterleave.Next] waiting for roleset %s/%s to be scaled" , roleSet . Namespace , roleSet . Name )
189+ // Wait for Pods to be created and status updated
142190 return nil
143191 }
144- // 2 . do the rollout process for each role
192+ // 3 . do the rollout process for each role
145193 roleSteps := make (map [string ]int32 )
146194 roleReadyStatus := make (map [string ]bool )
147195 currentStep := int32 (math .MaxInt32 )
148196 allRolesReady := true
149197
150198 // Check the current step for each role and determine the minimum step across all roles as the global current step
151199 for _ , role := range roleSet .Spec .Roles {
200+ if ! isRoleDependenciesReady (roleSet , & role ) {
201+ klog .Infof ("Skipping role %s/%s: dependencies not ready (will retry in next reconcile)" ,
202+ roleSet .Namespace , role .Name )
203+ continue // interleaved mode: skip only this role, others may proceed
204+ }
152205 allReady , currentRoleStep , err := GetRoleSyncer (m .cli , & role ).CheckCurrentStep (ctx , roleSet , & role )
153206 if err != nil {
154207 klog .Errorf ("[RollingManagerInterleave.Next] Failed to get current step for role %s in roleset %s/%s: %v" , role .Name , roleSet .Namespace , roleSet .Name , err )
0 commit comments