feat: add ClusterPlugin interface + migrate Ray to shared clusters#7460
Draft
pingsutw wants to merge 18 commits into
Draft
feat: add ClusterPlugin interface + migrate Ray to shared clusters#7460pingsutw wants to merge 18 commits into
pingsutw wants to merge 18 commits into
Conversation
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Add validation in RegisterK8sPlugin to support both legacy Plugin and new ClusterPlugin shapes: exactly one must be set, ResourceToWatch is always required, and ClusterPlugin additionally requires ClusterResourceToWatch. Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
When a k8s plugin entry has a non-nil ClusterPlugin field, construct a ClusterPluginManager instead of the standard PluginManager. Skip the InitializeObjectEventWatcher call for cluster-scoped plugins (v1 limitation). Standard plugin entries continue to use the existing PluginManager path. Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Remove the unused `submitterDefaultResourceRequirements` var and `buildSubmitterPodTemplate` function, which are no longer called after the Ray plugin migrated to KubeRay's ClusterSelector. Also remove the now-orphaned `k8s.io/apimachinery/pkg/api/resource` import. Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
…ation Signed-off-by: Kevin Su <pingsutw@apache.org>
A shared RayCluster is created once and reused by every task whose spec hashes to the same name. The head/worker pods were carrying the creating task's execution identity (RUN_NAME, ACTION_NAME, _U_RUN_BASE, FLYTE_INTERNAL_*), so the Ray driver - which runs on the head pod - read the wrong run's identity for every subsequent job, causing those jobs to hang reporting under the first run. Strip the run-scoped identity env vars from the cluster's head/worker containers in BuildClusterResource so the cluster stays run-agnostic. Per-job identity travels with the RayJob instead. User-supplied env is preserved. Signed-off-by: Kevin Su <pingsutw@apache.org>
With run-scoped env stripped from the shared cluster pods, the Ray driver (which runs on the shared head node) had no execution identity. Inject this job's RUN_NAME/ACTION_NAME/_U_RUN_BASE/FLYTE_INTERNAL_* into the RayJob runtime_env so Ray applies them to the driver process. Each job then reports under its own run instead of the cluster creator's, and the SDK runtime's env fallback resolves the correct identity. User-supplied runtime_env values are never overwritten. Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a new Kubernetes plugin model for “job-on-shared-cluster” execution (ClusterPlugin + ClusterPluginManager) and migrates the Ray plugin to use a long-lived, hash-keyed RayCluster with per-run RayJob submission. This enables multiple identical Ray task specs to reuse a single cluster and avoid per-task cold-start costs.
Changes:
- Added
ClusterPlugininterface and extended plugin registration to support exactly one ofPluginvsClusterPlugin. - Implemented
ClusterPluginManagerin the executor to drive a cluster-create/wait + job-submit/watch state machine. - Migrated Ray plugin to split resources into
RayCluster+RayJob, including deterministic cluster naming and per-job identity injection viaruntime_env.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| flyteplugins/go/tasks/plugins/k8s/ray/ray.go | Migrates Ray to shared-cluster model; adds deterministic cluster naming, env stripping, runtime_env identity injection, and job/cluster split. |
| flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go | Updates Ray tests for the new cluster/job split and adds coverage for naming/readiness/runtime_env injection. |
| flyteplugins/go/tasks/pluginmachinery/registry.go | Validates plugin registration for Plugin vs ClusterPlugin and enforces cluster resource watch type when needed. |
| flyteplugins/go/tasks/pluginmachinery/registry_test.go | Adds unit tests for the new registry validation rules. |
| flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go | Introduces the ClusterPlugin interface and extends PluginEntry with cluster resource watch metadata. |
| executor/setup.go | Registers Ray + JobSet CRDs into the executor scheme and imports the Ray plugin package. |
| executor/pkg/plugin/registry.go | Instantiates ClusterPluginManager for cluster plugins and PluginManager for legacy plugins. |
| executor/pkg/plugin/k8s/plugin_manager.go | Refactors metadata injection to support injecting name/ownership conditionally (used by shared cluster resources). |
| executor/pkg/plugin/k8s/cluster_plugin_manager.go | Adds the new executor-side manager and its persisted state machine. |
| executor/pkg/plugin/k8s/cluster_plugin_manager_test.go | Adds unit tests validating the cluster/job lifecycle and that abort/finalize only act on the job. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+328
to
331
| // The cluster is shared across tasks, so its pods must not carry any single task's execution | ||
| // identity. Per-job identity travels with the RayJob instead. | ||
| stripRunScopedEnvVars(&headPodTemplate.Spec) | ||
|
|
Comment on lines
+113
to
+117
| func (m *ClusterPluginManager) addClusterMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig, clusterName string) { | ||
| o.SetNamespace(taskCtx.GetNamespace()) | ||
| o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()))) | ||
| o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()))) | ||
| o.SetName(sanitizeName(clusterName)) |
Comment on lines
+277
to
+283
| jobSpec := rayv1.RayJobSpec{ | ||
| // The cluster is shared and standalone; the job must not own or shut it down. | ||
| RayClusterSpec: nil, | ||
| ClusterSelector: map[string]string{rayClusterLabelKey: clusterName}, | ||
| Entrypoint: entrypoint, | ||
| RuntimeEnvYAML: runtimeEnvYaml, | ||
| ShutdownAfterJobFinishes: false, |
Comment on lines
+122
to
+129
| // GetClusterName returns a deterministic name for the cluster backing this task. Implementations | ||
| // typically hash the task's plugin-spec proto so that identical specs collapse onto the same | ||
| // cluster. The framework sanitizes the result into a DNS1123 subdomain before use. | ||
| GetClusterName(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (string, error) | ||
|
|
||
| // BuildClusterResource builds the full cluster object (e.g. a RayCluster) that will be posted to | ||
| // k8s. Name and namespace are assigned by the framework from GetClusterName. | ||
| BuildClusterResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) |
Comment on lines
+328
to
331
| // The cluster is shared across tasks, so its pods must not carry any single task's execution | ||
| // identity. Per-job identity travels with the RayJob instead. | ||
| stripRunScopedEnvVars(&headPodTemplate.Spec) | ||
|
|
Comment on lines
+113
to
+117
| func (m *ClusterPluginManager) addClusterMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig, clusterName string) { | ||
| o.SetNamespace(taskCtx.GetNamespace()) | ||
| o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()))) | ||
| o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()))) | ||
| o.SetName(sanitizeName(clusterName)) |
Comment on lines
+277
to
+283
| jobSpec := rayv1.RayJobSpec{ | ||
| // The cluster is shared and standalone; the job must not own or shut it down. | ||
| RayClusterSpec: nil, | ||
| ClusterSelector: map[string]string{rayClusterLabelKey: clusterName}, | ||
| Entrypoint: entrypoint, | ||
| RuntimeEnvYAML: runtimeEnvYaml, | ||
| ShutdownAfterJobFinishes: false, |
Comment on lines
+122
to
+129
| // GetClusterName returns a deterministic name for the cluster backing this task. Implementations | ||
| // typically hash the task's plugin-spec proto so that identical specs collapse onto the same | ||
| // cluster. The framework sanitizes the result into a DNS1123 subdomain before use. | ||
| GetClusterName(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (string, error) | ||
|
|
||
| // BuildClusterResource builds the full cluster object (e.g. a RayCluster) that will be posted to | ||
| // k8s. Name and namespace are assigned by the framework from GetClusterName. | ||
| BuildClusterResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Tracking issue
Related to shared-cluster execution for Ray (and future cluster-backed plugins).
Why are the changes needed?
Today every Ray task produces a
RayJobwith an embeddedRayClusterSpec. KubeRay spins up a brand-new Ray cluster per task, runs the job, and tears the cluster down. Two Ray tasks with identical cluster configuration cannot share a cluster, so we pay full cluster cold-start cost on every task.This PR introduces a generic
ClusterPluginmodel so a plugin can run a job on top of a shared, long-lived cluster keyed by the hash of the job-spec proto. Two tasks whose spec hashes to the same name reuse one cluster; the job is created only after the cluster is ready. Ray is migrated to this model as the first consumer.What changes were proposed in this pull request?
1. New
ClusterPlugininterface (flyteplugins/.../pluginmachinery/k8s/plugin.go)Sits alongside the existing
Plugininterface. Instead of oneBuildResource, it declares two resources plus a readiness check:PluginEntrygainsClusterPlugin+ClusterResourceToWatch;RegisterK8sPluginvalidates exactly one ofPlugin/ClusterPluginis set.2. New
ClusterPluginManager(executor/pkg/plugin/k8s/cluster_plugin_manager.go)A sibling to
PluginManagerthat implementspluginsCore.Pluginand drives a 3-phase state machine:Key invariants:
Abort/Finalizeact only on the job, never the shared cluster.ClusterNameis persisted in plugin state so it round-trips across reconcile rounds.3. Architecture
4. Ray migration (
flyteplugins/.../plugins/k8s/ray/ray.go)GetClusterName:"ray-" + truncate(pbhash(RayJob proto)), DNS1123-safe. Identical configs → identical name.BuildClusterResource: standaloneRayClusterlabeledray.io/cluster=<name>, autoscaling enabled.BuildJobResource:RayJobwithRayClusterSpec=nil,ClusterSelector={ray.io/cluster: <name>},ShutdownAfterJobFinishes=false(cluster must outlive the job).GetJobPhase: the formerGetTaskPhasebody, unchanged.Pluginmethods (BuildResource,BuildIdentityResource,IsTerminal,GetCompletionTime) removed; registered viaClusterPlugin.5. Two correctness fixes discovered during live testing
Because the Ray driver runs on the shared head pod, run identity could not come from the cluster pod's env (that env belongs to whichever task created the cluster). Two fixes make each job report under its own run:
RUN_NAME,ACTION_NAME,_U_RUN_BASE,FLYTE_INTERNAL_*are removed from the head/worker containers so the cluster is run-agnostic (user-supplied env preserved).runtime_env.env_vars— Ray appliesruntime_envto the driver process, so each job's driver sees its ownRUN_NAME/_U_RUN_BASE/FLYTE_INTERNAL_*and reports under its own run. User-suppliedruntime_envvalues are never overwritten.Dataflow (per run)
How was this patch tested?
Unit tests (
go test ./flyteplugins/.../ray/... ./executor/pkg/plugin/...): registry validation, cluster/job split, readiness, deterministic naming, env stripping, and runtime_env injection (including "does not overwrite user-supplied env"). All green;go vetclean.End-to-end in the local devbox (k3s + KubeRay v1.6.1):
Example (
reusable_ray.py):Ran it multiple times concurrently against the devbox. All runs succeeded on a single shared cluster, each with its own identity:
All TaskActions (and nested python children) reached
Completed.Labels
Known limitations / follow-ups
IdleTimeoutSeconds/ external GC; the plugin never deletes a shared cluster.GetJobPhasehead-pod lookup uses{cluster}-head; KubeRay appends a suffix, so the dashboard/vscode readiness link can be slightly off (non-fatal; pre-existing).Check all the applicable boxes