Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions api/orchestration/v1alpha1/roleset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,30 @@ type RoleSpec struct {
// +optional
Replicas *int32 `json:"replicas,omitempty"`

// Dependencies specifies the list of role names that must be ready before this role starts or is updated.
// This is used to define a dependency graph between different roles, ensuring correct ordering during
// both initial scale-up and rolling updates. Each element in the slice should correspond to the name
// of another role defined within the same StormService specification.
//
// For example, if RoleA depends on RoleB and RoleC, then RoleA's Dependencies would look like:
// Dependencies: []string{"RoleB", "RoleC"}
//
// - **During scale-up (initial startup or scaling from zero):**
// RoleA will not begin creating Pods until all its dependencies (e.g., RoleB and RoleC) have reached
// their desired number of ready replicas.
//
// - **During rolling updates:**
// RoleA will not proceed to update its existing Pods if any of its dependencies are not currently ready.
// This includes cases where a dependency is failing, has zero ready replicas, or is itself undergoing an update.
// Note: Controller **does not automatically trigger updates** on dependent roles (e.g., RoleB or RoleC).
// It only checks whether they are currently ready. You must explicitly update dependencies if needed.
//
// In both cases, if a dependency becomes unready at any time, RoleA’s scale-up or rollout will be paused
// until all dependencies are satisfied again.
//
// +optional
Dependencies []string `json:"dependencies,omitempty"`

// UpgradeOrder specifies the order in which this role should be upgraded.
// Lower values are upgraded first. If not specified, roles upgrade after all explicitly ordered roles.
// +optional
Expand Down
5 changes: 5 additions & 0 deletions api/orchestration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ spec:
roles:
items:
properties:
dependencies:
items:
type: string
type: array
disruptionTolerance:
properties:
maxUnavailable:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ spec:
roles:
items:
properties:
dependencies:
items:
type: string
type: array
disruptionTolerance:
properties:
maxUnavailable:
Expand Down
4 changes: 4 additions & 0 deletions dist/chart/crds/orchestration.aibrix.ai_rolesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ spec:
roles:
items:
properties:
dependencies:
items:
type: string
type: array
disruptionTolerance:
properties:
maxUnavailable:
Expand Down
4 changes: 4 additions & 0 deletions dist/chart/crds/orchestration.aibrix.ai_stormservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ spec:
roles:
items:
properties:
dependencies:
items:
type: string
type: array
disruptionTolerance:
properties:
maxUnavailable:
Expand Down
11 changes: 11 additions & 0 deletions pkg/client/applyconfiguration/orchestration/v1alpha1/rolespec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 77 additions & 24 deletions pkg/controller/roleset/rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"math"
"strings"

orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"

"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/client"

orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
)

type RollingManager interface {
Expand All @@ -38,22 +37,35 @@ type RollingManagerSequential struct {
}

func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
// 1. ensure pod replica meet expectations
// 1. Find the furthest role whose dependencies are ready
progressIndex := -1
for i, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
break
Comment on lines +42 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using &role where role is a for...range loop variable is a common pitfall in Go. It takes the address of the loop variable, which is reused across iterations. While it might work here because it's used within the loop's scope, it's safer and clearer to get a pointer to the slice element directly to avoid potential bugs.

Additionally, this block of code for dependency checking and scaling is duplicated across RollingManagerSequential, RollingManagerParallel, and RollingManagerInterleave. Consider refactoring it into a shared helper function to improve maintainability and avoid repeating the same logic.

Suggested change
for i, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
break
for i := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &roleSet.Spec.Roles[i]) {
break

}
progressIndex = i
}

if progressIndex == -1 {
return nil // even A is blocked (shouldn't happen)
}

// 2. Scale all roles from 0 to progressIndex
var scaling bool
for _, role := range roleSet.Spec.Roles {
klog.Infof("[RollingManagerSequential.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
for i := 0; i <= progressIndex; i++ {
role := &roleSet.Spec.Roles[i]
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
if err != nil {
return err
}
scaling = scaling || s
}
if scaling {
klog.Infof("[RollingManagerSequential.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
// Wait for Pods to be created and status updated
return nil
}

// 2. Sort roles by upgrade order
// 3. Sort roles by upgrade order
klog.Infof("[RollingManagerSequential.Next] sorting roleset roles by UpgradeOrder")
sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles)
var sequenceLines []string
Expand All @@ -72,8 +84,12 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat
roleSet.Name,
strings.Join(sequenceLines, "\n"))

// 3. do the rollout process for each role by order
// 4. do the rollout process for each role by order
for _, role := range sortedRoles {
if !isRoleDependenciesReady(roleSet, &role) {
klog.Infof("Rollout of role %s blocked: dependencies not ready. Stopping sequential update.", role.Name)
break // enforce strict sequential order
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sequential mode so we break

}
klog.Infof("[RollingManagerSequential.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role)
if err != nil {
Expand All @@ -95,29 +111,48 @@ type RollingManagerParallel struct {
}

func (m *RollingManagerParallel) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
// 1. ensure pod replica meet expectations
// 1. Find the furthest role whose dependencies are ready
progressIndex := -1
for i, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
break
Comment on lines +116 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to my other comment, using &role with a for...range loop variable can be error-prone. It's better to reference the slice element directly to get a stable pointer.

Suggested change
for i, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
break
for i := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &roleSet.Spec.Roles[i]) {
break

}
progressIndex = i
}

if progressIndex == -1 {
return nil // even A is blocked (shouldn't happen)
}

// 2: Scale all roles from 0 to progressIndex
var scaling bool
for _, role := range roleSet.Spec.Roles {
klog.Infof("[RollingManagerParallel.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
for i := 0; i <= progressIndex; i++ {
role := &roleSet.Spec.Roles[i]
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
if err != nil {
return err
}
scaling = scaling || s
}
if scaling {
klog.Infof("[RollingManagerParallel.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
return
// Wait for Pods to be created and status updated
return nil
}
// 2. do the rollout process for each role

// 3. do the rollout process for each role
for _, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
klog.Infof("Skipping rollout for role %s: "+
"dependencies not ready (parallel update continues for other roles)", role.Name)
continue // parallel mode, skip only this role
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallel mode so we continue

}
klog.Infof("[RollingManagerParallel.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role)
if err != nil {
return err
}
}
return
return nil
}

type RollingManagerInterleave struct {
Expand All @@ -127,28 +162,46 @@ type RollingManagerInterleave struct {
// Interleaved rollout: update roles in alternating steps,
// using (maxSurge + maxUnavailable) as the step size for all roles
func (m *RollingManagerInterleave) Next(ctx context.Context, roleSet *orchestrationv1alpha1.RoleSet) (err error) {
// 1. ensure pod replica meet expectations
// 1. Find the furthest role whose dependencies are ready
progressIndex := -1
for i, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
break
Comment on lines +167 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to my other comment, using &role with a for...range loop variable can be error-prone. It's better to reference the slice element directly to get a stable pointer.

Suggested change
for i, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
break
for i := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &roleSet.Spec.Roles[i]) {
break

}
progressIndex = i
}

if progressIndex == -1 {
return nil // even A is blocked (shouldn't happen)
}

// 2. Scale all roles from 0 to progressIndex
var scaling bool
for _, role := range roleSet.Spec.Roles {
klog.Infof("[RollingManagerInterleave.Next] start to scale roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name)
s, err := GetRoleSyncer(m.cli, &role).Scale(ctx, roleSet, &role)
for i := 0; i <= progressIndex; i++ {
role := &roleSet.Spec.Roles[i]
s, err := GetRoleSyncer(m.cli, role).Scale(ctx, roleSet, role)
if err != nil {
return err
}
scaling = scaling || s
}
if scaling {
klog.Infof("[RollingManagerInterleave.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name)
// Wait for Pods to be created and status updated
return nil
}
// 2. do the rollout process for each role
// 3. do the rollout process for each role
roleSteps := make(map[string]int32)
roleReadyStatus := make(map[string]bool)
currentStep := int32(math.MaxInt32)
allRolesReady := true

// Check the current step for each role and determine the minimum step across all roles as the global current step
for _, role := range roleSet.Spec.Roles {
if !isRoleDependenciesReady(roleSet, &role) {
klog.Infof("Skipping role %s/%s: dependencies not ready (will retry in next reconcile)",
roleSet.Namespace, role.Name)
continue // interleaved mode: skip only this role, others may proceed
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interleaved mode so we continue

}
allReady, currentRoleStep, err := GetRoleSyncer(m.cli, &role).CheckCurrentStep(ctx, roleSet, &role)
if err != nil {
klog.Errorf("[RollingManagerInterleave.Next] Failed to get current step for role %s in roleset %s/%s: %v", role.Name, roleSet.Namespace, roleSet.Name, err)
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/roleset/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,35 @@ func sortRolesByUpgradeOrder(roles []orchestrationv1alpha1.RoleSpec) []orchestra
})
return sortedRoles
}

func isRoleDependenciesReady(roleSet *orchestrationv1alpha1.RoleSet, role *orchestrationv1alpha1.RoleSpec) bool {
if len(role.Dependencies) == 0 {
return true
}

// Initialize all roles to 0
roleStatusMap := make(map[string]int32)
for _, r := range roleSet.Spec.Roles {
roleStatusMap[r.Name] = 0
}
for _, rs := range roleSet.Status.Roles {
roleStatusMap[rs.Name] = rs.ReadyReplicas
}

for _, depName := range role.Dependencies {
depReady := roleStatusMap[depName]
expected := int32(1)
for _, r := range roleSet.Spec.Roles {
if r.Name == depName && r.Replicas != nil {
expected = *r.Replicas
break
}
}
if depReady < expected {
klog.V(4).Infof("Role %s depends on %s, "+
"but only %d/%d ready", role.Name, depName, depReady, expected)
return false
}
}
return true
}
Loading