diff --git a/components/renku_data_services/crc/api.spec.yaml b/components/renku_data_services/crc/api.spec.yaml index 3b77a98d0..8ba9e5430 100644 --- a/components/renku_data_services/crc/api.spec.yaml +++ b/components/renku_data_services/crc/api.spec.yaml @@ -550,7 +550,7 @@ paths: content: "application/json": schema: - $ref: "#/components/schemas/K8sLabelList" + $ref: "#/components/schemas/Tolerations" default: $ref: "#/components/responses/Error" tags: @@ -1075,7 +1075,7 @@ components: default: $ref: "#/components/schemas/DefaultFlag" tolerations: - $ref: "#/components/schemas/K8sLabelList" + $ref: "#/components/schemas/Tolerations" node_affinities: $ref: "#/components/schemas/NodeAffinityList" required: ["cpu", "memory", "gpu", "max_storage", "name", "default", "default_storage"] @@ -1106,7 +1106,7 @@ components: default: $ref: "#/components/schemas/DefaultFlagPatch" tolerations: - $ref: "#/components/schemas/K8sLabelList" + $ref: "#/components/schemas/Tolerations" node_affinities: $ref: "#/components/schemas/NodeAffinityList" example: @@ -1133,7 +1133,7 @@ components: default: $ref: "#/components/schemas/DefaultFlagPatch" tolerations: - $ref: "#/components/schemas/K8sLabelList" + $ref: "#/components/schemas/Tolerations" node_affinities: $ref: "#/components/schemas/NodeAffinityList" required: ["id"] @@ -1161,7 +1161,7 @@ components: default: $ref: "#/components/schemas/DefaultFlag" tolerations: - $ref: "#/components/schemas/K8sLabelList" + $ref: "#/components/schemas/Tolerations" node_affinities: $ref: "#/components/schemas/NodeAffinityList" required: ["cpu", "memory", "gpu", "max_storage", "name", "id", "default", "default_storage"] @@ -1197,7 +1197,7 @@ components: matching: type: boolean tolerations: - $ref: "#/components/schemas/K8sLabelList" + $ref: "#/components/schemas/Tolerations" node_affinities: $ref: "#/components/schemas/NodeAffinityList" required: ["cpu", "memory", "gpu", "max_storage", "name", "id", "default", "default_storage"] @@ -1800,15 +1800,65 @@ components: description: A threshold in seconds after which a session gets culled/deleted (0 means no threshold) minimum: 0 maximum: 2147483647 - K8sLabelList: + Tolerations: type: array - description: A list of k8s labels used for tolerations + description: | + A list of toleration items used for pod scheduling in Kubernetes. items: - $ref: "#/components/schemas/K8sLabel" - example: ["test-label-1"] - uniqueItems: true + $ref: "#/components/schemas/Toleration" default: [] - minItems: 0 + Toleration: + type: object + description: | + A toleration item. + The pod this Toleration is attached to tolerates any taint + that matches the triple + using the matching operator . + properties: + effect: + description: |- + Effect indicates the taint effect to match. Empty means match all taint effects. + When specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute. + type: string + key: + description: |- + Key is the taint key that the toleration applies to. Empty means match all taint keys. + If the key is empty, operator must be Exists; this combination means to match all values and all keys. + type: string + operator: + description: |- + Operator represents a key's relationship to the value. + Valid operators are Exists and Equal. Defaults to Equal. + Exists is equivalent to wildcard for value, so that a pod can + tolerate all taints of a particular category. + type: string + tolerationSeconds: + description: |- + TolerationSeconds represents the period of time the toleration (which must be + of effect NoExecute, otherwise this field is ignored) tolerates the taint. By default, + it is not set, which means tolerate the taint forever (do not evict). Zero and + negative values will be treated as 0 (evict immediately) by the system. + format: int64 + type: integer + value: + description: |- + Value is the taint value the toleration matches to. + If the operator is Exists, the value should be empty, otherwise just a regular string. + type: string + example: + effect: "NoSchedule" + key: "renku.io/dedicated" + operator: "Equal" + value: "user" + # K8sLabelList: + # type: array + # description: A list of k8s labels used for tolerations + # items: + # $ref: "#/components/schemas/K8sLabel" + # example: ["test-label-1"] + # uniqueItems: true + # default: [] + # minItems: 0 K8sLabel: type: string description: A valid K8s label diff --git a/components/renku_data_services/crc/apispec.py b/components/renku_data_services/crc/apispec.py index 26b1d7cbf..4b2db2540 100644 --- a/components/renku_data_services/crc/apispec.py +++ b/components/renku_data_services/crc/apispec.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: api.spec.yaml -# timestamp: 2025-10-13T09:06:36+00:00 +# timestamp: 2025-10-31T14:40:52+00:00 from __future__ import annotations @@ -88,14 +88,26 @@ class RemoteConfigurationFirecrestPatch(BaseAPISpec): ) -class K8sLabel(RootModel[str]): - root: str = Field( - ..., - description="A valid K8s label", - examples=["some-label-1"], - max_length=63, - min_length=3, - pattern="^[a-z0-9A-Z][a-z0-9A-Z-_./]*[a-z0-9A-Z]$", +class Toleration(BaseAPISpec): + effect: Optional[str] = Field( + None, + description="Effect indicates the taint effect to match. Empty means match all taint effects.\nWhen specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute.", + ) + key: Optional[str] = Field( + None, + description="Key is the taint key that the toleration applies to. Empty means match all taint keys.\nIf the key is empty, operator must be Exists; this combination means to match all values and all keys.", + ) + operator: Optional[str] = Field( + None, + description="Operator represents a key's relationship to the value.\nValid operators are Exists and Equal. Defaults to Equal.\nExists is equivalent to wildcard for value, so that a pod can\ntolerate all taints of a particular category.", + ) + tolerationSeconds: Optional[int] = Field( + None, + description="TolerationSeconds represents the period of time the toleration (which must be\nof effect NoExecute, otherwise this field is ignored) tolerates the taint. By default,\nit is not set, which means tolerate the taint forever (do not evict). Zero and\nnegative values will be treated as 0 (evict immediately) by the system.", + ) + value: Optional[str] = Field( + None, + description="Value is the taint value the toleration matches to.\nIf the operator is Exists, the value should be empty, otherwise just a regular string.", ) @@ -485,11 +497,9 @@ class ResourceClass(BaseAPISpec): description="A default selection for resource classes or resource pools", examples=[False], ) - tolerations: Optional[List[K8sLabel]] = Field( + tolerations: Optional[List[Toleration]] = Field( None, - description="A list of k8s labels used for tolerations", - examples=[["test-label-1"]], - min_length=0, + description="A list of toleration items used for pod scheduling in Kubernetes.\n", ) node_affinities: Optional[List[NodeAffinity]] = Field( None, @@ -541,11 +551,9 @@ class ResourceClassPatch(BaseAPISpec): description="A default selection for resource classes or resource pools", examples=[False], ) - tolerations: Optional[List[K8sLabel]] = Field( + tolerations: Optional[List[Toleration]] = Field( None, - description="A list of k8s labels used for tolerations", - examples=[["test-label-1"]], - min_length=0, + description="A list of toleration items used for pod scheduling in Kubernetes.\n", ) node_affinities: Optional[List[NodeAffinity]] = Field( None, @@ -603,11 +611,9 @@ class ResourceClassPatchWithId(BaseAPISpec): description="A default selection for resource classes or resource pools", examples=[False], ) - tolerations: Optional[List[K8sLabel]] = Field( + tolerations: Optional[List[Toleration]] = Field( None, - description="A list of k8s labels used for tolerations", - examples=[["test-label-1"]], - min_length=0, + description="A list of toleration items used for pod scheduling in Kubernetes.\n", ) node_affinities: Optional[List[NodeAffinity]] = Field( None, @@ -663,11 +669,9 @@ class ResourceClassWithId(BaseAPISpec): description="A default selection for resource classes or resource pools", examples=[False], ) - tolerations: Optional[List[K8sLabel]] = Field( + tolerations: Optional[List[Toleration]] = Field( None, - description="A list of k8s labels used for tolerations", - examples=[["test-label-1"]], - min_length=0, + description="A list of toleration items used for pod scheduling in Kubernetes.\n", ) node_affinities: Optional[List[NodeAffinity]] = Field( None, @@ -724,11 +728,9 @@ class ResourceClassWithIdFiltered(BaseAPISpec): examples=[False], ) matching: Optional[bool] = None - tolerations: Optional[List[K8sLabel]] = Field( + tolerations: Optional[List[Toleration]] = Field( None, - description="A list of k8s labels used for tolerations", - examples=[["test-label-1"]], - min_length=0, + description="A list of toleration items used for pod scheduling in Kubernetes.\n", ) node_affinities: Optional[List[NodeAffinity]] = Field( None, diff --git a/components/renku_data_services/crc/core.py b/components/renku_data_services/crc/core.py index 4bb88ec08..05e9ec32e 100644 --- a/components/renku_data_services/crc/core.py +++ b/components/renku_data_services/crc/core.py @@ -8,6 +8,9 @@ from renku_data_services.base_models import RESET, ResetType from renku_data_services.crc import apispec, models from renku_data_services.errors import errors +from renku_data_services.k8s.pod_scheduling import api as k8s_api +from renku_data_services.k8s.pod_scheduling import models as k8s_models +from renku_data_services.k8s.pod_scheduling import transforms as k8s_transforms def validate_quota(body: apispec.QuotaWithOptionalId) -> models.UnsavedQuota: @@ -43,7 +46,11 @@ def validate_resource_class(body: apispec.ResourceClass) -> models.UnsavedResour ), key=lambda x: (x.key, x.required_during_scheduling), ) - tolerations = sorted(t.root for t in body.tolerations or []) + api_tolerations = k8s_api.TolerationsField.model_validate( + [tol.model_dump(mode="json") for tol in body.tolerations or []] + ) + tolerations = k8s_transforms.transform_tolerations(body=api_tolerations) or [] + tolerations = sorted(tolerations, key=k8s_models.Toleration.sort_key) return models.UnsavedResourceClass( name=body.name, cpu=body.cpu, @@ -91,9 +98,16 @@ def validate_resource_class_patch_or_put( ), key=lambda x: (x.key, x.required_during_scheduling), ) - tolerations: list[str] | None = [] if method == "PUT" else None + # tolerations: list[str] | None = [] if method == "PUT" else None + # if body.tolerations: + # tolerations = sorted(t.root for t in body.tolerations or []) + tolerations: list[k8s_models.Toleration] | None = [] if method == "PUT" else None if body.tolerations: - tolerations = sorted(t.root for t in body.tolerations or []) + api_tolerations = k8s_api.TolerationsField.model_validate( + [tol.model_dump(mode="json") for tol in body.tolerations] + ) + tolerations = k8s_transforms.transform_tolerations(body=api_tolerations) or [] + tolerations = sorted(tolerations, key=k8s_models.Toleration.sort_key) if rc_id: return models.ResourceClassPatchWithId( id=rc_id, diff --git a/components/renku_data_services/crc/db.py b/components/renku_data_services/crc/db.py index 04c853097..35435fc6b 100644 --- a/components/renku_data_services/crc/db.py +++ b/components/renku_data_services/crc/db.py @@ -20,6 +20,7 @@ import renku_data_services.base_models as base_models from renku_data_services import errors +from renku_data_services.app_config import logging from renku_data_services.base_models import RESET from renku_data_services.crc import models from renku_data_services.crc import orm as schemas @@ -29,6 +30,8 @@ from renku_data_services.k8s.db import QuotaRepository from renku_data_services.users.db import UserRepo +logger = logging.getLogger(__name__) + class _Base: def __init__(self, session_maker: Callable[..., AsyncSession], quotas_repo: QuotaRepository) -> None: @@ -338,8 +341,9 @@ async def get_classes( # NOTE: The line below ensures that the right users can access the right resources, do not remove. stmt = _classes_user_access_control(api_user, stmt) - res = await session.execute(stmt) - orms = res.scalars().all() + res = await session.scalars(stmt) + orms = res.all() + logger.warning(f"Classes = {[orm.dump() for orm in orms]}") return [orm.dump() for orm in orms] async def get_resource_class(self, api_user: base_models.APIUser, id: int) -> models.ResourceClass: @@ -586,18 +590,31 @@ async def update_resource_class( cls.node_affinities.remove(existing_affinity) if update.tolerations is not None: - existing_tolerations: dict[str, schemas.TolerationORM] = {tol.key: tol for tol in cls.tolerations} - new_tolerations: dict[str, schemas.TolerationORM] = { - tol: schemas.TolerationORM(key=tol) for tol in update.tolerations - } - for new_tol_key, new_tol in new_tolerations.items(): - if new_tol_key not in existing_tolerations: - # CREATE a brand new toleration - cls.tolerations.append(new_tol) - # REMOVE a toleration - for existing_tol_key, existing_tol in existing_tolerations.items(): - if existing_tol_key not in new_tolerations: - cls.tolerations.remove(existing_tol) + # existing_tolerations: dict[str, schemas.TolerationORM] = {tol.key: tol for tol in cls.tolerations} + # new_tolerations: dict[str, schemas.TolerationORM] = { + # tol: schemas.TolerationORM(key=tol) for tol in update.tolerations + # } + # for new_tol_key, new_tol in new_tolerations.items(): + # if new_tol_key not in existing_tolerations: + # # CREATE a brand new toleration + # cls.tolerations.append(new_tol) + # # REMOVE a toleration + # for existing_tol_key, existing_tol in existing_tolerations.items(): + # if existing_tol_key not in new_tolerations: + # cls.tolerations.remove(existing_tol) + + # NOTE: the whole list of tolerations is updated + existing_tolerations = list(cls.new_tolerations) + for existing_tol, new_tol in zip(existing_tolerations, update.tolerations, strict=False): + existing_tol.contents = new_tol.to_dict() + + if len(update.tolerations) > len(existing_tolerations): + # Add new tolerations + for new_tol in update.tolerations[len(existing_tolerations) :]: + cls.new_tolerations.append(schemas.NewTolerationORM.from_model(new_tol)) + elif len(update.tolerations) < len(existing_tolerations): + # Remove old tolerations + cls.new_tolerations = cls.new_tolerations[: len(update.tolerations)] # NOTE: do we need to perform this check? if cls.resource_pool is None: diff --git a/components/renku_data_services/crc/models.py b/components/renku_data_services/crc/models.py index 4286760ca..ac566639c 100644 --- a/components/renku_data_services/crc/models.py +++ b/components/renku_data_services/crc/models.py @@ -10,6 +10,7 @@ from renku_data_services import errors from renku_data_services.base_models import ResetType from renku_data_services.k8s.constants import ClusterId +from renku_data_services.k8s.pod_scheduling import models as k8s_models from renku_data_services.notebooks.cr_amalthea_session import TlsSecret @@ -88,7 +89,8 @@ class UnsavedResourceClass(ResourcesCompareMixin): default: bool = False default_storage: int = 1 node_affinities: list[NodeAffinity] = field(default_factory=list) - tolerations: list[str] = field(default_factory=list) + # tolerations: list[str] = field(default_factory=list) + tolerations: list[k8s_models.Toleration] = field(default_factory=list) @dataclass(frozen=True, eq=True, kw_only=True) @@ -105,7 +107,8 @@ class ResourceClass(ResourcesCompareMixin): default_storage: int = 1 matching: Optional[bool] = None node_affinities: list[NodeAffinity] = field(default_factory=list) - tolerations: list[str] = field(default_factory=list) + # tolerations: list[str] = field(default_factory=list) + tolerations: list[k8s_models.Toleration] = field(default_factory=list) quota: str | None = None @@ -121,7 +124,8 @@ class ResourceClassPatch: default: bool | None = None default_storage: int | None = None node_affinities: list[NodeAffinity] | None = None - tolerations: list[str] | None = None + # tolerations: list[str] | None = None + tolerations: list[k8s_models.Toleration] | None = None @dataclass(frozen=True, eq=True, kw_only=True) diff --git a/components/renku_data_services/crc/orm.py b/components/renku_data_services/crc/orm.py index c66bbc376..5b25d0545 100644 --- a/components/renku_data_services/crc/orm.py +++ b/components/renku_data_services/crc/orm.py @@ -26,6 +26,7 @@ from renku_data_services.crc.models import ClusterSettings, SavedClusterSettings, SessionProtocol from renku_data_services.errors import errors from renku_data_services.k8s.constants import ClusterId +from renku_data_services.k8s.pod_scheduling import models as k8s_models from renku_data_services.utils.sqlalchemy import ULIDType logger = logging.getLogger(__name__) @@ -105,6 +106,12 @@ class ResourceClassORM(BaseORM): cascade="save-update, merge, delete", lazy="selectin", ) + new_tolerations: Mapped[list[NewTolerationORM]] = relationship( + back_populates="resource_class", + default_factory=list, + cascade="save-update, merge, delete", + lazy="selectin", + ) node_affinities: Mapped[list[NodeAffintyORM]] = relationship( back_populates="resource_class", default_factory=list, @@ -124,7 +131,10 @@ def from_unsaved_model( ) for affinity in new_resource_class.node_affinities ] - tolerations = [TolerationORM(key=toleration) for toleration in new_resource_class.tolerations] + # tolerations = [TolerationORM(key=toleration) for toleration in new_resource_class.tolerations] + + new_tolerations = [NewTolerationORM.from_model(tol) for tol in new_resource_class.tolerations] + return cls( name=new_resource_class.name, cpu=new_resource_class.cpu, @@ -134,7 +144,8 @@ def from_unsaved_model( default_storage=new_resource_class.default_storage, gpu=new_resource_class.gpu, resource_pool_id=resource_pool_id, - tolerations=tolerations, + # tolerations=tolerations, + new_tolerations=new_tolerations, node_affinities=node_affinities, ) @@ -160,7 +171,8 @@ def dump( default=self.default, default_storage=self.default_storage, node_affinities=[affinity.dump() for affinity in self.node_affinities], - tolerations=[toleration.key for toleration in self.tolerations], + # tolerations=[toleration.key for toleration in self.tolerations], + tolerations=[tol.dump() for tol in self.new_tolerations], matching=matching, quota=self.resource_pool.quota if self.resource_pool else None, ) @@ -349,6 +361,32 @@ class TolerationORM(BaseORM): id: Mapped[int] = mapped_column("id", Integer, Identity(always=True), primary_key=True, default=None, init=False) +class NewTolerationORM(BaseORM): + """Toleration items for pod scheduling.""" + + __tablename__ = "new_tolerations" + id: Mapped[int] = mapped_column("id", Integer, Identity(always=True), primary_key=True, default=None, init=False) + resource_class: Mapped[ResourceClassORM] = relationship( + back_populates="new_tolerations", lazy="selectin", init=False + ) + resource_class_id: Mapped[int] = mapped_column(ForeignKey("resource_classes.id"), index=True, init=False) + contents: Mapped[dict[str, Any]] = mapped_column(JSONVariant, nullable=False) + + def dump(self) -> k8s_models.Toleration: + """Create a toleration model from the ORM object.""" + return k8s_models.Toleration.from_dict(data=self.contents) + + @classmethod + def from_model( + cls, + toleration: k8s_models.Toleration, + ) -> NewTolerationORM: + """Create a new ORM object from an internal toleration model.""" + return cls( + contents=toleration.to_dict(), + ) + + class NodeAffintyORM(BaseORM): """The key for a K8s node label used to schedule loads specific nodes.""" diff --git a/components/renku_data_services/k8s/pod_scheduling/__init__.py b/components/renku_data_services/k8s/pod_scheduling/__init__.py new file mode 100644 index 000000000..dc199e233 --- /dev/null +++ b/components/renku_data_services/k8s/pod_scheduling/__init__.py @@ -0,0 +1,7 @@ +"""Common models for pod scheduling. + +Includes models for: +* node_selector +* affinity +* tolerations +""" diff --git a/components/renku_data_services/k8s/pod_scheduling/api.py b/components/renku_data_services/k8s/pod_scheduling/api.py new file mode 100644 index 000000000..7921bfe7d --- /dev/null +++ b/components/renku_data_services/k8s/pod_scheduling/api.py @@ -0,0 +1,23 @@ +"""Models to use for validating API requests and responses.""" + +from pydantic import RootModel + +from renku_data_services.notebooks.cr_amalthea_session import Affinity, Toleration + + +class AffinityField(RootModel[Affinity | None]): + """Affinity field.""" + + root: Affinity | None = None + + +class NodeSelectorField(RootModel[dict[str, str] | None]): + """Node selector field.""" + + root: dict[str, str] | None = None + + +class TolerationsField(RootModel[list[Toleration] | None]): + """Tolerations field.""" + + root: list[Toleration] | None = None diff --git a/components/renku_data_services/k8s/pod_scheduling/models.py b/components/renku_data_services/k8s/pod_scheduling/models.py new file mode 100644 index 000000000..af685d6ec --- /dev/null +++ b/components/renku_data_services/k8s/pod_scheduling/models.py @@ -0,0 +1,398 @@ +"""Models for internal logic.""" + +from dataclasses import asdict, dataclass +from enum import StrEnum +from typing import Any, Self + + +class NodeSelectorRequirementOperator(StrEnum): + """The different types of operator which can be used in NodeSelectorRequirement. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#nodeselectorrequirement-v1-core. + """ + + does_not_exist = "DoesNotExist" + exists = "Exists" + greater_than = "Gt" + less_than = "Lt" + not_in = "NotIn" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class NodeSelectorRequirement: + """Node selector requirement. + + A node selector requirement is a selector that contains values, a key, + and an operator that relates the key and values. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#nodeselectorrequirement-v1-core. + + """ + + key: str + """The label key that the selector applies to.""" + + operator: NodeSelectorRequirementOperator + """Represents a key's relationship to a set of values.""" + + values: list[str] | None = None + """An array of string values.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class NodeSelectorTerm: + """Selector term for scheduling pods on nodes. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#nodeselectorterm-v1-core. + """ + + matchExpressions: list[NodeSelectorRequirement] | None = None + """A list of node selector requirements by node's labels.""" + + matchFields: list[NodeSelectorRequirement] | None = None + """A list of node selector requirements by node's fields.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class PreferredDuringSchedulingIgnoredDuringExecutionItem: + """Preference term for scheduling. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#preferredschedulingterm-v1-core. + """ + + preference: NodeSelectorTerm + """A node selector term, associated with the corresponding weight.""" + + weight: int + """Weight associated with matching the corresponding nodeSelectorTerm, in the range 1-100.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class NodeSelector: + """A node selector. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#nodeselector-v1-core. + """ + + nodeSelectorTerms: list[NodeSelectorTerm] + """Required. A list of node selector terms. The terms are ORed.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class NodeAffinity: + """Node affinity field. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#nodeaffinity-v1-core. + """ + + preferredDuringSchedulingIgnoredDuringExecution: ( + list[PreferredDuringSchedulingIgnoredDuringExecutionItem] | None + ) = None + """The scheduler will prefer to schedule pods to nodes that satisfy the affinity expressions specified + by this field, but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest sum of weights, + i.e. for each node that meets all of the scheduling requirements (resource request, + requiredDuringScheduling affinity expressions, etc.), compute a sum by iterating through + the elements of this field and adding "weight" to the sum if the node matches the corresponding matchExpressions; + the node(s) with the highest sum are the most preferred.""" + + requiredDuringSchedulingIgnoredDuringExecution: NodeSelector | None = None + """If the affinity requirements specified by this field are not met at scheduling time, + the pod will not be scheduled onto the node. + If the affinity requirements specified by this field cease to be met at some point + during pod execution (e.g. due to an update), + the system may or may not try to eventually evict the pod from its node.""" + + +class LabelSelectorRequirementOperator(StrEnum): + """The different types of operator which can be used in LabelSelectorRequirement. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#labelselectorrequirement-v1-meta. + """ + + does_not_exist = "DoesNotExist" + exists = "Exists" + not_in = "NotIn" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class LabelSelectorRequirement: + """Label selector requirement. + + A label selector requirement is a selector that contains values, a key, + and an operator that relates the key and values. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#labelselectorrequirement-v1-meta. + """ + + key: str + """key is the label key that the selector applies to.""" + + operator: LabelSelectorRequirementOperator + """operator represents a key's relationship to a set of values.""" + + values: list[str] | None = None + """values is an array of string values.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class LabelSelector: + """Represents a label selector. + + A label selector is a label query over a set of resources. + The result of matchLabels and matchExpressions are ANDed. + An empty label selector matches all objects. A null label selector matches no objects. + """ + + matchExpressions: list[LabelSelectorRequirement] | None = None + """matchExpressions is a list of label selector requirements. The requirements are ANDed.""" + + matchLabels: dict[str, str] | None = None + """matchLabels is a map of {key,value} pairs. + + A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, + whose key field is "key", the operator is "In", and the values array contains only "value". + The requirements are ANDed. + """ + + +@dataclass(frozen=True, eq=True, kw_only=True) +class PodAffinityTerm: + """Pod affinity term. + + Defines a set of pods (namely those matching the labelSelector relative to the given namespace(s)) + that this pod should be co-located (affinity) or not co-located (anti-affinity) with, + where co-located is defined as running on a node whose value of the label + with key matches that of any node on which a pod of the set of pods is running. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podaffinityterm-v1-core. + """ + + labelSelector: LabelSelector | None = None + """A label query over a set of resources, in this case pods. + If it's null, this PodAffinityTerm matches with no Pods.""" + + matchLabelKeys: list[str] | None = None + """MatchLabelKeys is a set of pod label keys to select which pods will be taken into consideration. + The keys are used to lookup values from the incoming pod labels, + those key-value labels are merged with `labelSelector` as `key in (value)` + to select the group of existing pods which pods will be taken into consideration + for the incoming pod's pod (anti) affinity. Keys that don't exist in the incoming pod labels will be ignored. + The default value is empty. The same key is forbidden to exist in both matchLabelKeys and labelSelector. + Also, matchLabelKeys cannot be set when labelSelector isn't set.""" + + mismatchLabelKeys: list[str] | None = None + """MismatchLabelKeys is a set of pod label keys to select which pods will be taken into consideration. + The keys are used to lookup values from the incoming pod labels, + those key-value labels are merged with `labelSelector` as `key notin (value)` + to select the group of existing pods which pods will be taken into consideration + for the incoming pod's pod (anti) affinity. Keys that don't exist in the incoming pod labels will be ignored. + The default value is empty. The same key is forbidden to exist in both mismatchLabelKeys and labelSelector. + Also, mismatchLabelKeys cannot be set when labelSelector isn't set.""" + + namespaceSelector: LabelSelector | None = None + """A label query over the set of namespaces that the term applies to. + The term is applied to the union of the namespaces selected by this field + and the ones listed in the namespaces field. + null selector and null or empty namespaces list means "this pod's namespace". + An empty selector ({}) matches all namespaces.""" + + namespaces: list[str] | None = None + """namespaces specifies a static list of namespace names that the term applies to. + The term is applied to the union of the namespaces listed in this field + and the ones selected by namespaceSelector. + null or empty namespaces list and null namespaceSelector means "this pod's namespace".""" + + topologyKey: str + """This pod should be co-located (affinity) or not co-located (anti-affinity) + with the pods matching the labelSelector in the specified namespaces, + where co-located is defined as running on a node whose value of the label + with key topologyKey matches that of any node on which any of the selected pods is running. + Empty topologyKey is not allowed.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class WeightedPodAffinityTerm: + """Weighted pod affinity term. + + The weights of all of the matched WeightedPodAffinityTerm fields are added per-node + to find the most preferred node(s). + """ + + podAffinityTerm: PodAffinityTerm + """Required. A pod affinity term, associated with the corresponding weight.""" + + weight: int + """weight associated with matching the corresponding podAffinityTerm, in the range 1-100.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class PodAffinity: + """Pod affinity field. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podaffinity-v1-core. + """ + + preferredDuringSchedulingIgnoredDuringExecution: list[WeightedPodAffinityTerm] | None = None + """The scheduler will prefer to schedule pods to nodes that satisfy + the affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest sum of weights, + i.e. for each node that meets all of the scheduling requirements (resource request, + requiredDuringScheduling affinity expressions, etc.), compute a sum + by iterating through the elements of this field and adding "weight" to the sum + if the node has pods which matches the corresponding podAffinityTerm; + the node(s) with the highest sum are the most preferred.""" + + requiredDuringSchedulingIgnoredDuringExecution: list[PodAffinityTerm] | None = None + """If the affinity requirements specified by this field are not met at scheduling time, + the pod will not be scheduled onto the node. + If the affinity requirements specified by this field cease to be met at some point + during pod execution (e.g. due to a pod label update), + the system may or may not try to eventually evict the pod from its node. + When there are multiple elements, the lists of nodes corresponding to each podAffinityTerm are intersected, + i.e. all terms must be satisfied.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class PodAntiAffinity: + """Pod anti-affinity field. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podantiaffinity-v1-core. + """ + + preferredDuringSchedulingIgnoredDuringExecution: list[WeightedPodAffinityTerm] | None = None + """The scheduler will prefer to schedule pods to nodes that satisfy + the anti-affinity expressions specified by this field, + but it may choose a node that violates one or more of the expressions. + The node that is most preferred is the one with the greatest sum of weights, + i.e. for each node that meets all of the scheduling requirements (resource request, + requiredDuringScheduling anti-affinity expressions, etc.), compute a sum + by iterating through the elements of this field and subtracting "weight" from the sum + if the node has pods which matches the corresponding podAffinityTerm; + the node(s) with the highest sum are the most preferred.""" + + requiredDuringSchedulingIgnoredDuringExecution: list[PodAffinityTerm] | None = None + """If the anti-affinity requirements specified by this field are not met at scheduling time, + the pod will not be scheduled onto the node. + If the anti-affinity requirements specified by this field cease to be met at some point + during pod execution (e.g. due to a pod label update), + the system may or may not try to eventually evict the pod from its node. + When there are multiple elements, the lists of nodes corresponding to each podAffinityTerm are intersected, + i.e. all terms must be satisfied.""" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class Affinity: + """Affinity field. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#affinity-v1-core. + """ + + nodeAffinity: NodeAffinity | None = None + """Describes node affinity scheduling rules for the pod.""" + + podAffinity: PodAffinity | None = None + """Describes pod affinity scheduling rules + (e.g. co-locate this pod in the same node, zone, etc. as some other pod(s)).""" + + podAntiAffinity: PodAntiAffinity | None = None + """Describes pod anti-affinity scheduling rules + (e.g. avoid putting this pod in the same node, zone, etc. as some other pod(s)).""" + + +class TolerationEffect(StrEnum): + """The different types of effects of a taint. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#toleration-v1-core. + """ + + no_execute = "NoExecute" + no_schedule = "NoSchedule" + prefer_no_schedule = "PreferNoSchedule" + + +class TolerationOperator(StrEnum): + """The different types of operators of a toleration. + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#toleration-v1-core. + """ + + equal = "Equal" + exists = "Exists" + + +@dataclass(frozen=True, eq=True, kw_only=True) +class Toleration: + """Toleration term. + + The pod this Toleration is attached to tolerates any taint that matches the triple + using the matching operator . + + See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#toleration-v1-core. + """ + + effect: TolerationEffect | None = None + """Effect indicates the taint effect to match.""" + + key: str | None = None + """Key is the taint key that the toleration applies to. + Empty means match all taint keys. + If the key is empty, operator must be Exists; this combination means to match all values and all keys.""" + + operator: TolerationOperator | None = None + """Operator represents a key's relationship to the value.""" + + tolerationSeconds: int | None = None + """TolerationSeconds represents the period of time the toleration + (which must be of effect NoExecute, otherwise this field is ignored) tolerates the taint. + By default, it is not set, which means tolerate the taint forever (do not evict). + Zero and negative values will be treated as 0 (evict immediately) by the system.""" + + value: str | None = None + """Value is the taint value the toleration matches to. + If the operator is Exists, the value should be empty, otherwise just a regular string.""" + + def to_dict(self) -> dict[str, Any]: + """Converts this instance into a dictionary for serialization.""" + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Self: + """Convert a dict object into a Toleration instance.""" + data_effect = data.get("effect") + effect = TolerationEffect(data_effect) if data_effect else None + data_operator = data.get("operator") + operator = TolerationOperator(data_operator) if data_operator else None + return cls( + effect=effect, + key=data.get("key"), + operator=operator, + tolerationSeconds=data.get("tolerationSeconds"), + value=data.get("value"), + ) + + @staticmethod + def sort_key(item: "Toleration") -> tuple[str, str, str, str]: + """Key for sorting toleration items.""" + return (item.key or "", item.operator or "", item.effect or "", item.value or "") + + +type AffinityField = Affinity | None +"""Affinity field. + +See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#affinity-v1-core. +""" + + +type NodeSelectorField = dict[str, str] | None +"""Node selector field. + +See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podspec-v1-core. +""" + +type TolerationsField = list[Toleration] | None +"""Tolerations field. + +See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podspec-v1-core. +""" diff --git a/components/renku_data_services/k8s/pod_scheduling/transforms.py b/components/renku_data_services/k8s/pod_scheduling/transforms.py new file mode 100644 index 000000000..2adf35513 --- /dev/null +++ b/components/renku_data_services/k8s/pod_scheduling/transforms.py @@ -0,0 +1,23 @@ +"""Functions to transform API models into internal ones.""" + +from renku_data_services.k8s.pod_scheduling import api, models + + +def transform_tolerations(body: api.TolerationsField) -> models.TolerationsField: + """Transforms tolerations.""" + if body.root is None: + return None + return [transform_toleration(body=tol) for tol in body.root] + + +def transform_toleration(body: api.Toleration) -> models.Toleration: + """Transforms a toleration item.""" + effect = models.TolerationEffect(body.effect) if body.effect else None + operator = models.TolerationOperator(body.operator) if body.operator else None + return models.Toleration( + effect=effect, + key=body.key, + operator=operator, + tolerationSeconds=body.tolerationSeconds, + value=body.value, + ) diff --git a/components/renku_data_services/migrations/versions/4c18f553bfa0_feat_support_full_k8s_pod_scheduling.py b/components/renku_data_services/migrations/versions/4c18f553bfa0_feat_support_full_k8s_pod_scheduling.py new file mode 100644 index 000000000..86740c525 --- /dev/null +++ b/components/renku_data_services/migrations/versions/4c18f553bfa0_feat_support_full_k8s_pod_scheduling.py @@ -0,0 +1,54 @@ +"""feat: support full k8s pod scheduling + +Revision ID: 4c18f553bfa0 +Revises: d437be68a4fb +Create Date: 2025-10-31 08:10:15.948917 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "4c18f553bfa0" +down_revision = "d437be68a4fb" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "new_tolerations", + sa.Column("id", sa.Integer(), sa.Identity(always=True), nullable=False), + sa.Column("resource_class_id", sa.Integer(), nullable=False), + sa.Column( + "contents", sa.JSON().with_variant(postgresql.JSONB(astext_type=sa.Text()), "postgresql"), nullable=False + ), + sa.ForeignKeyConstraint( + ["resource_class_id"], + ["resource_pools.resource_classes.id"], + ), + sa.PrimaryKeyConstraint("id"), + schema="resource_pools", + ) + op.create_index( + op.f("ix_resource_pools_new_tolerations_resource_class_id"), + "new_tolerations", + ["resource_class_id"], + unique=False, + schema="resource_pools", + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + op.f("ix_resource_pools_new_tolerations_resource_class_id"), + table_name="new_tolerations", + schema="resource_pools", + ) + op.drop_table("new_tolerations", schema="resource_pools") + # ### end Alembic commands ### diff --git a/components/renku_data_services/notebooks/api/amalthea_patches/general.py b/components/renku_data_services/notebooks/api/amalthea_patches/general.py index 1b8d856d4..c82b83b87 100644 --- a/components/renku_data_services/notebooks/api/amalthea_patches/general.py +++ b/components/renku_data_services/notebooks/api/amalthea_patches/general.py @@ -33,7 +33,7 @@ def session_tolerations(server: UserServer) -> list[dict[str, Any]]: "op": "add", "path": "/statefulset/spec/template/spec/tolerations", "value": default_tolerations - + [toleration.json_match_expression() for toleration in server.server_options.tolerations], + + [toleration.to_dict() for toleration in server.server_options.tolerations], } ], } diff --git a/components/renku_data_services/notebooks/api/schemas/server_options.py b/components/renku_data_services/notebooks/api/schemas/server_options.py index cdd6fc096..7adad2128 100644 --- a/components/renku_data_services/notebooks/api/schemas/server_options.py +++ b/components/renku_data_services/notebooks/api/schemas/server_options.py @@ -7,6 +7,7 @@ from marshmallow import Schema, fields from renku_data_services.crc.models import ResourceClass +from renku_data_services.k8s.pod_scheduling import models as k8s_models from renku_data_services.notebooks.api.schemas.custom_fields import ByteSizeField, CpuField, GpuField from renku_data_services.notebooks.config.dynamic import CPUEnforcement from renku_data_services.notebooks.errors.programming import ProgrammingError @@ -27,18 +28,18 @@ def json_match_expression(self) -> dict[str, str]: } -@dataclass -class Toleration: - """Toleration used to schedule a session on tainted nodes.""" +# @dataclass +# class Toleration: +# """Toleration used to schedule a session on tainted nodes.""" - key: str +# key: str - def json_match_expression(self) -> dict[str, Any]: - """Create match expression for this class.""" - return { - "key": self.key, - "operator": "Exists", - } +# def json_match_expression(self) -> dict[str, Any]: +# """Create match expression for this class.""" +# return { +# "key": self.key, +# "operator": "Exists", +# } @dataclass @@ -54,7 +55,7 @@ class ServerOptions: gigabytes: bool = False priority_class: Optional[str] = None node_affinities: list[NodeAffinity] = field(default_factory=list) - tolerations: list[Toleration] = field(default_factory=list) + tolerations: list[k8s_models.Toleration] = field(default_factory=list) resource_class_id: Optional[int] = None idle_threshold_seconds: Optional[int] = None hibernation_threshold_seconds: Optional[int] = None @@ -69,7 +70,7 @@ def __post_init__(self) -> None: message="Cannot create a ServerOptions dataclass with node " "affinities that are not of type NodeAffinity" ) - if not all([isinstance(toleration, Toleration) for toleration in self.tolerations]): + if not all([isinstance(toleration, k8s_models.Toleration) for toleration in self.tolerations]): raise ProgrammingError( message="Cannot create a ServerOptions dataclass with tolerations that are not of type Toleration" ) @@ -83,7 +84,8 @@ def __post_init__(self) -> None: if not self.tolerations: self.tolerations = [] else: - self.tolerations = sorted(self.tolerations, key=lambda x: x.key) + self.tolerations = sorted(self.tolerations, key=k8s_models.Toleration.sort_key) + # self.tolerations = sorted(self.tolerations, key=lambda x: x.key) def __compare( self, @@ -191,7 +193,8 @@ def from_resource_class(cls, data: ResourceClass) -> Self: NodeAffinity(key=a.key, required_during_scheduling=a.required_during_scheduling) for a in data.node_affinities ], - tolerations=[Toleration(t) for t in data.tolerations], + # tolerations=[Toleration(t) for t in data.tolerations], + tolerations=data.tolerations, resource_class_id=data.id, ) diff --git a/components/renku_data_services/notebooks/utils.py b/components/renku_data_services/notebooks/utils.py index e9cdd3b5d..427009ce2 100644 --- a/components/renku_data_services/notebooks/utils.py +++ b/components/renku_data_services/notebooks/utils.py @@ -135,5 +135,7 @@ def tolerations_from_resource_class( output: list[Toleration] = [] output.extend(default_tolerations) for tol in resource_class.tolerations: - output.append(Toleration(key=tol, operator="Exists")) + # output.append(Toleration(key=tol, operator="Exists")) + output.append(Toleration.model_validate(tol.to_dict())) + return output diff --git a/components/renku_data_services/session/config.py b/components/renku_data_services/session/config.py index 92098e2b9..2a22dc3e4 100644 --- a/components/renku_data_services/session/config.py +++ b/components/renku_data_services/session/config.py @@ -18,7 +18,8 @@ class BuildsConfig: enabled: bool = False build_output_image_prefix: str | None = None - vscodium_python_run_image: str | None = None + build_builder_image: str | None = None + build_run_image: str | None = None build_strategy_name: str | None = None push_secret_name: str | None = None buildrun_retention_after_failed: timedelta | None = None @@ -32,7 +33,8 @@ def from_env(cls) -> "BuildsConfig": """Create a config from environment variables.""" enabled = os.environ.get("IMAGE_BUILDERS_ENABLED", "false").lower() == "true" build_output_image_prefix = os.environ.get("BUILD_OUTPUT_IMAGE_PREFIX") - vscodium_python_run_image = os.environ.get("BUILD_VSCODIUM_PYTHON_RUN_IMAGE") + build_builder_image = os.environ.get("BUILD_BUILDER_IMAGE") + build_run_image = os.environ.get("BUILD_RUN_IMAGE") build_strategy_name = os.environ.get("BUILD_STRATEGY_NAME") push_secret_name = os.environ.get("BUILD_PUSH_SECRET_NAME") buildrun_retention_after_failed_seconds = int(os.environ.get("BUILD_RUN_RETENTION_AFTER_FAILED_SECONDS") or "0") @@ -76,7 +78,8 @@ def from_env(cls) -> "BuildsConfig": return cls( enabled=enabled or False, build_output_image_prefix=build_output_image_prefix or None, - vscodium_python_run_image=vscodium_python_run_image or None, + build_builder_image=build_builder_image, + build_run_image=build_run_image, build_strategy_name=build_strategy_name or None, push_secret_name=push_secret_name or None, buildrun_retention_after_failed=buildrun_retention_after_failed, diff --git a/components/renku_data_services/session/constants.py b/components/renku_data_services/session/constants.py index 7b5956980..93193722e 100644 --- a/components/renku_data_services/session/constants.py +++ b/components/renku_data_services/session/constants.py @@ -14,16 +14,16 @@ BUILD_OUTPUT_IMAGE_NAME: Final[str] = "renku-build" """The container image name created from Renku builds.""" -BUILD_BUILDER_IMAGE: Final[str] = "ghcr.io/swissdatasciencecenter/renku-frontend-buildpacks/selector:0.1.0" +BUILD_DEFAULT_BUILDER_IMAGE: Final[str] = "ghcr.io/swissdatasciencecenter/renku-frontend-buildpacks/selector:0.1.0" -BUILD_RUN_IMAGE: Final[str] = "ghcr.io/swissdatasciencecenter/renku-frontend-buildpacks/base-image:0.1.0" +BUILD_DEFAULT_RUN_IMAGE: Final[str] = "ghcr.io/swissdatasciencecenter/renku-frontend-buildpacks/base-image:0.1.0" BUILD_MOUNT_DIRECTORY: Final[PurePosixPath] = PurePosixPath("/home/renku/work") BUILD_WORKING_DIRECTORY: Final[PurePosixPath] = BUILD_MOUNT_DIRECTORY BUILD_UID: Final[int] = 1000 BUILD_GID: Final[int] = 1000 BUILD_PORT: Final[int] = 8888 -DEFAULT_URLS: Final[dict[str, str]] = { - "vscodium": "/", +BUILD_DEFAULT_URL_PATH: Final[str] = "/" +BUILD_URL_PATH_MAP: Final[dict[str, str]] = { "jupyterlab": "/lab", } diff --git a/components/renku_data_services/session/db.py b/components/renku_data_services/session/db.py index 72323a402..de88167ae 100644 --- a/components/renku_data_services/session/db.py +++ b/components/renku_data_services/session/db.py @@ -435,7 +435,9 @@ async def insert_launcher( created_by_id=user.id, description=f"Generated environment for {launcher.name}", container_image="image:unknown-at-the-moment", # TODO: This should come from the build - default_url=constants.DEFAULT_URLS.get(launcher.environment.frontend_variant, "/"), + default_url=constants.BUILD_URL_PATH_MAP.get( + launcher.environment.frontend_variant, constants.BUILD_DEFAULT_URL_PATH + ), port=constants.BUILD_PORT, # TODO: This should come from the build working_directory=constants.BUILD_WORKING_DIRECTORY, # TODO: This should come from the build mount_directory=constants.BUILD_MOUNT_DIRECTORY, # TODO: This should come from the build @@ -1087,8 +1089,8 @@ def _get_buildrun_params( return models.ShipwrightBuildRunParams( name=build.k8s_name, git_repository=git_repository, - build_image=constants.BUILD_BUILDER_IMAGE, - run_image=constants.BUILD_RUN_IMAGE, + build_image=self.builds_config.build_builder_image or constants.BUILD_DEFAULT_BUILDER_IMAGE, + run_image=self.builds_config.build_run_image or constants.BUILD_DEFAULT_RUN_IMAGE, output_image=output_image, build_strategy_name=build_strategy_name, push_secret_name=push_secret_name,