Skip to content

feat: add ClusterPlugin interface + migrate Ray to shared clusters#7460

Draft
pingsutw wants to merge 18 commits into
mainfrom
feature/cluster-plugin-interface
Draft

feat: add ClusterPlugin interface + migrate Ray to shared clusters#7460
pingsutw wants to merge 18 commits into
mainfrom
feature/cluster-plugin-interface

Conversation

@pingsutw

Copy link
Copy Markdown
Member

Tracking issue

Related to shared-cluster execution for Ray (and future cluster-backed plugins).

Why are the changes needed?

Today every Ray task produces a RayJob with an embedded RayClusterSpec. KubeRay spins up a brand-new Ray cluster per task, runs the job, and tears the cluster down. Two Ray tasks with identical cluster configuration cannot share a cluster, so we pay full cluster cold-start cost on every task.

This PR introduces a generic ClusterPlugin model so a plugin can run a job on top of a shared, long-lived cluster keyed by the hash of the job-spec proto. Two tasks whose spec hashes to the same name reuse one cluster; the job is created only after the cluster is ready. Ray is migrated to this model as the first consumer.

What changes were proposed in this pull request?

1. New ClusterPlugin interface (flyteplugins/.../pluginmachinery/k8s/plugin.go)

Sits alongside the existing Plugin interface. Instead of one BuildResource, it declares two resources plus a readiness check:

type ClusterPlugin interface {
    GetClusterName(ctx, taskCtx) (string, error)               // deterministic name = hash(spec proto)
    BuildClusterResource(ctx, taskCtx) (client.Object, error)  // the shared cluster
    BuildClusterIdentityResource(ctx, taskMeta) (client.Object, error)
    IsClusterReady(ctx, pluginContext, cluster) (bool, error)
    BuildJobResource(ctx, taskCtx, clusterName) (client.Object, error)  // job bound to the cluster
    BuildJobIdentityResource(ctx, taskMeta) (client.Object, error)
    GetJobPhase(ctx, pluginContext, job) (pluginsCore.PhaseInfo, error)
    GetProperties() PluginProperties
}

PluginEntry gains ClusterPlugin + ClusterResourceToWatch; RegisterK8sPlugin validates exactly one of Plugin/ClusterPlugin is set.

2. New ClusterPluginManager (executor/pkg/plugin/k8s/cluster_plugin_manager.go)

A sibling to PluginManager that implements pluginsCore.Plugin and drives a 3-phase state machine:

NotStarted   ── create-or-reuse shared cluster (IsAlreadyExists tolerated) ─▶ ClusterWait
ClusterWait  ── IsClusterReady? ──no──▶ stay Initializing
                               ──yes─▶ submit job (ClusterSelector) ────────▶ JobStarted
JobStarted   ── GetJobPhase ──▶ Running / Success(+read outputs) / Failure

Key invariants:

  • The cluster is created without owner reference or finalizer so it outlives any single task execution (cleanup delegated to KubeRay's idle autoscaler / external GC).
  • The job is owned normally (owner ref + finalizer); Abort/Finalize act only on the job, never the shared cluster.
  • ClusterName is persisted in plugin state so it round-trips across reconcile rounds.

3. Architecture

                         ┌───────────────────────────────────────────────┐
                         │              ClusterPluginManager              │
 TaskAction (ray) ─────▶ │  Handle() state machine                       │
                         │                                               │
                         │  ┌─ NotStarted ─────────────────────────────┐ │
                         │  │ name = GetClusterName(spec)  (proto hash) │ │
                         │  │ GET cluster; if absent:                   │ │
                         │  │   BuildClusterResource → Create           │ │
                         │  │   (no ownerRef / no finalizer)            │ │
                         │  └──────────────────────────────────────────┘ │
                         │  ┌─ ClusterWait ────────────────────────────┐ │
                         │  │ IsClusterReady(cluster)?                  │ │
                         │  │   ready → BuildJobResource(name) → Create │ │
                         │  │          (ownerRef + finalizer)           │ │
                         │  └──────────────────────────────────────────┘ │
                         │  ┌─ JobStarted ─────────────────────────────┐ │
                         │  │ GetJobPhase(job) → phase + outputs        │ │
                         │  └──────────────────────────────────────────┘ │
                         └───────────────────────────────────────────────┘
        run A (spec X) ┐
                       ├─ hash(X) = ray-<h> ──▶ ONE RayCluster ray-<h>
        run B (spec X) ┘                          ▲   ▲
                                       RayJob A ───┘   └─── RayJob B
                                       (ClusterSelector: ray.io/cluster=ray-<h>)

4. Ray migration (flyteplugins/.../plugins/k8s/ray/ray.go)

  • GetClusterName: "ray-" + truncate(pbhash(RayJob proto)), DNS1123-safe. Identical configs → identical name.
  • BuildClusterResource: standalone RayCluster labeled ray.io/cluster=<name>, autoscaling enabled.
  • BuildJobResource: RayJob with RayClusterSpec=nil, ClusterSelector={ray.io/cluster: <name>}, ShutdownAfterJobFinishes=false (cluster must outlive the job).
  • GetJobPhase: the former GetTaskPhase body, unchanged.
  • Old Plugin methods (BuildResource, BuildIdentityResource, IsTerminal, GetCompletionTime) removed; registered via ClusterPlugin.

5. Two correctness fixes discovered during live testing

Because the Ray driver runs on the shared head pod, run identity could not come from the cluster pod's env (that env belongs to whichever task created the cluster). Two fixes make each job report under its own run:

  • Strip run-scoped env from the shared cluster podsRUN_NAME, ACTION_NAME, _U_RUN_BASE, FLYTE_INTERNAL_* are removed from the head/worker containers so the cluster is run-agnostic (user-supplied env preserved).
  • Inject per-job identity into the RayJob runtime_env.env_vars — Ray applies runtime_env to the driver process, so each job's driver sees its own RUN_NAME/_U_RUN_BASE/FLYTE_INTERNAL_* and reports under its own run. User-supplied runtime_env values are never overwritten.

Dataflow (per run)

flyte run ──▶ RunService creates TaskAction(ray)
   └─▶ ClusterPluginManager.Handle
        ├─ GetClusterName(spec)  ────────────▶ ray-<hash>   (same for identical specs)
        ├─ ensure RayCluster ray-<hash>      ─▶ KubeRay provisions head+workers (shared)
        ├─ IsClusterReady → ready
        └─ submit RayJob  (ClusterSelector=ray-<hash>,
                           runtime_env.env_vars = THIS run's identity)
                 └─▶ KubeRay binds job to existing cluster
                       └─▶ driver on head pod reads its OWN identity from runtime_env
                             └─▶ job runs → Complete → outputs read → TaskAction Completed

How was this patch tested?

Unit tests (go test ./flyteplugins/.../ray/... ./executor/pkg/plugin/...): registry validation, cluster/job split, readiness, deterministic naming, env stripping, and runtime_env injection (including "does not overwrite user-supplied env"). All green; go vet clean.

End-to-end in the local devbox (k3s + KubeRay v1.6.1):

Example (reusable_ray.py):

import asyncio, typing, ray
from flyteplugins.ray.task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
import flyte

@ray.remote
def f(x): return x * x

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=1)],
    runtime_env={"pip": ["numpy", "ty"]},
    enable_autoscaling=False,
)
image = (flyte.Image.from_debian_base(name="flyte")
         .with_pip_packages("ray[default]==2.46.0", "flyteplugins-ray"))
ray_env = flyte.TaskEnvironment(name="ray_env", plugin_config=ray_config, image=image,
                                resources=flyte.Resources(cpu=(2, 4), memory=("3000Mi", "5000Mi")))

@ray_env.task
async def hello_ray_nested(n: int = 3) -> typing.List[int]:
    futures = [f.remote(i) for i in range(n)]
    return ray.get(futures)

Ran it multiple times concurrently against the devbox. All runs succeeded on a single shared cluster, each with its own identity:

$ kubectl -n flyte get rayclusters,rayjobs
NAME                                     STATUS   AGE
raycluster.ray.io/ray-jw97ftk5ayrzerhm   ready    95s          # ← ONE shared cluster

NAME                                      JOB STATUS   RAY CLUSTER NAME       END TIME
rayjob.ray.io/rb9t6k2vcvgqlv4cc42n-a0-0   SUCCEEDED    ray-jw97ftk5ayrzerhm   ...:51Z
rayjob.ray.io/rgnnxfd7swdks6wh8kjz-a0-0   SUCCEEDED    ray-jw97ftk5ayrzerhm   ...:58Z
rayjob.ray.io/rkgt2ls7kdrtqkrsmx7b-a0-0   SUCCEEDED    ray-jw97ftk5ayrzerhm   ...:08Z

# Each RayJob's runtime_env carries its OWN run identity (the fix):
rb9t6k2vcvgqlv4cc42n-a0-0 → RUN_NAME=rb9t6k2vcvgqlv4cc42n  cluster=ray-jw97ftk5ayrzerhm
rgnnxfd7swdks6wh8kjz-a0-0 → RUN_NAME=rgnnxfd7swdks6wh8kjz  cluster=ray-jw97ftk5ayrzerhm
rkgt2ls7kdrtqkrsmx7b-a0-0 → RUN_NAME=rkgt2ls7kdrtqkrsmx7b  cluster=ray-jw97ftk5ayrzerhm

All TaskActions (and nested python children) reached Completed.

Labels

  • added (ClusterPlugin interface + manager, Ray shared-cluster support)
  • changed (Ray plugin migrated from embedded-cluster to shared-cluster model)

Known limitations / follow-ups

  • Event-watching (recent k8s object events on phase info) is not yet wired for the cluster manager — deferred follow-up.
  • Idle cluster cleanup relies on KubeRay autoscaler IdleTimeoutSeconds / external GC; the plugin never deletes a shared cluster.
  • GetJobPhase head-pod lookup uses {cluster}-head; KubeRay appends a suffix, so the dashboard/vscode readiness link can be slightly off (non-fatal; pre-existing).

Check all the applicable boxes

  • All new and existing tests passed.
  • All commits are signed-off.
  • I updated the documentation accordingly.

pingsutw added 18 commits May 29, 2026 01:43
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Add validation in RegisterK8sPlugin to support both legacy Plugin and new
ClusterPlugin shapes: exactly one must be set, ResourceToWatch is always
required, and ClusterPlugin additionally requires ClusterResourceToWatch.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
When a k8s plugin entry has a non-nil ClusterPlugin field, construct a
ClusterPluginManager instead of the standard PluginManager. Skip the
InitializeObjectEventWatcher call for cluster-scoped plugins (v1 limitation).
Standard plugin entries continue to use the existing PluginManager path.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Remove the unused `submitterDefaultResourceRequirements` var and
`buildSubmitterPodTemplate` function, which are no longer called
after the Ray plugin migrated to KubeRay's ClusterSelector. Also
remove the now-orphaned `k8s.io/apimachinery/pkg/api/resource` import.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
…ation

Signed-off-by: Kevin Su <pingsutw@apache.org>
A shared RayCluster is created once and reused by every task whose spec
hashes to the same name. The head/worker pods were carrying the creating
task's execution identity (RUN_NAME, ACTION_NAME, _U_RUN_BASE,
FLYTE_INTERNAL_*), so the Ray driver - which runs on the head pod - read
the wrong run's identity for every subsequent job, causing those jobs to
hang reporting under the first run.

Strip the run-scoped identity env vars from the cluster's head/worker
containers in BuildClusterResource so the cluster stays run-agnostic.
Per-job identity travels with the RayJob instead. User-supplied env is
preserved.

Signed-off-by: Kevin Su <pingsutw@apache.org>
With run-scoped env stripped from the shared cluster pods, the Ray driver
(which runs on the shared head node) had no execution identity. Inject
this job's RUN_NAME/ACTION_NAME/_U_RUN_BASE/FLYTE_INTERNAL_* into the
RayJob runtime_env so Ray applies them to the driver process. Each job
then reports under its own run instead of the cluster creator's, and the
SDK runtime's env fallback resolves the correct identity. User-supplied
runtime_env values are never overwritten.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Copilot AI review requested due to automatic review settings May 30, 2026 02:19

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new Kubernetes plugin model for “job-on-shared-cluster” execution (ClusterPlugin + ClusterPluginManager) and migrates the Ray plugin to use a long-lived, hash-keyed RayCluster with per-run RayJob submission. This enables multiple identical Ray task specs to reuse a single cluster and avoid per-task cold-start costs.

Changes:

  • Added ClusterPlugin interface and extended plugin registration to support exactly one of Plugin vs ClusterPlugin.
  • Implemented ClusterPluginManager in the executor to drive a cluster-create/wait + job-submit/watch state machine.
  • Migrated Ray plugin to split resources into RayCluster + RayJob, including deterministic cluster naming and per-job identity injection via runtime_env.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
flyteplugins/go/tasks/plugins/k8s/ray/ray.go Migrates Ray to shared-cluster model; adds deterministic cluster naming, env stripping, runtime_env identity injection, and job/cluster split.
flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go Updates Ray tests for the new cluster/job split and adds coverage for naming/readiness/runtime_env injection.
flyteplugins/go/tasks/pluginmachinery/registry.go Validates plugin registration for Plugin vs ClusterPlugin and enforces cluster resource watch type when needed.
flyteplugins/go/tasks/pluginmachinery/registry_test.go Adds unit tests for the new registry validation rules.
flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go Introduces the ClusterPlugin interface and extends PluginEntry with cluster resource watch metadata.
executor/setup.go Registers Ray + JobSet CRDs into the executor scheme and imports the Ray plugin package.
executor/pkg/plugin/registry.go Instantiates ClusterPluginManager for cluster plugins and PluginManager for legacy plugins.
executor/pkg/plugin/k8s/plugin_manager.go Refactors metadata injection to support injecting name/ownership conditionally (used by shared cluster resources).
executor/pkg/plugin/k8s/cluster_plugin_manager.go Adds the new executor-side manager and its persisted state machine.
executor/pkg/plugin/k8s/cluster_plugin_manager_test.go Adds unit tests validating the cluster/job lifecycle and that abort/finalize only act on the job.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +328 to 331
// The cluster is shared across tasks, so its pods must not carry any single task's execution
// identity. Per-job identity travels with the RayJob instead.
stripRunScopedEnvVars(&headPodTemplate.Spec)

Comment on lines +113 to +117
func (m *ClusterPluginManager) addClusterMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig, clusterName string) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels())))
o.SetName(sanitizeName(clusterName))
Comment on lines +277 to +283
jobSpec := rayv1.RayJobSpec{
// The cluster is shared and standalone; the job must not own or shut it down.
RayClusterSpec: nil,
ClusterSelector: map[string]string{rayClusterLabelKey: clusterName},
Entrypoint: entrypoint,
RuntimeEnvYAML: runtimeEnvYaml,
ShutdownAfterJobFinishes: false,
Comment on lines +122 to +129
// GetClusterName returns a deterministic name for the cluster backing this task. Implementations
// typically hash the task's plugin-spec proto so that identical specs collapse onto the same
// cluster. The framework sanitizes the result into a DNS1123 subdomain before use.
GetClusterName(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (string, error)

// BuildClusterResource builds the full cluster object (e.g. a RayCluster) that will be posted to
// k8s. Name and namespace are assigned by the framework from GetClusterName.
BuildClusterResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error)
Comment on lines +328 to 331
// The cluster is shared across tasks, so its pods must not carry any single task's execution
// identity. Per-job identity travels with the RayJob instead.
stripRunScopedEnvVars(&headPodTemplate.Spec)

Comment on lines +113 to +117
func (m *ClusterPluginManager) addClusterMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig, clusterName string) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels())))
o.SetName(sanitizeName(clusterName))
Comment on lines +277 to +283
jobSpec := rayv1.RayJobSpec{
// The cluster is shared and standalone; the job must not own or shut it down.
RayClusterSpec: nil,
ClusterSelector: map[string]string{rayClusterLabelKey: clusterName},
Entrypoint: entrypoint,
RuntimeEnvYAML: runtimeEnvYaml,
ShutdownAfterJobFinishes: false,
Comment on lines +122 to +129
// GetClusterName returns a deterministic name for the cluster backing this task. Implementations
// typically hash the task's plugin-spec proto so that identical specs collapse onto the same
// cluster. The framework sanitizes the result into a DNS1123 subdomain before use.
GetClusterName(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (string, error)

// BuildClusterResource builds the full cluster object (e.g. a RayCluster) that will be posted to
// k8s. Name and namespace are assigned by the framework from GetClusterName.
BuildClusterResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error)
@pingsutw pingsutw self-assigned this May 30, 2026
@pingsutw pingsutw added this to the V2 GA milestone May 30, 2026
@pingsutw pingsutw marked this pull request as draft June 2, 2026 00:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants