diff --git a/doc/source/ray-core/doc_code/direct_transport_gloo.py b/doc/source/ray-core/doc_code/direct_transport_gloo.py index 866b827db960..392a66a510f2 100644 --- a/doc/source/ray-core/doc_code/direct_transport_gloo.py +++ b/doc/source/ray-core/doc_code/direct_transport_gloo.py @@ -140,7 +140,7 @@ def sum(self, tensor: torch.Tensor): ray.get(tensor) assert ( - "Currently ray.get() only supports OBJECT_STORE and NIXL tensor transport, got TensorTransportEnum.GLOO, please specify the correct tensor transport in ray.get()" + "Trying to use two-sided tensor transport: GLOO for ray.get. This is only supported for one-sided transports such as NIXL or the OBJECT_STORE." in str(e.value) ) diff --git a/python/ray/_private/custom_types.py b/python/ray/_private/custom_types.py index fc5e3a5622fb..76a29bb192bd 100644 --- a/python/ray/_private/custom_types.py +++ b/python/ray/_private/custom_types.py @@ -124,9 +124,7 @@ # See `common.proto` for more details. class TensorTransportEnum(Enum): OBJECT_STORE = TensorTransport.Value("OBJECT_STORE") - NCCL = TensorTransport.Value("NCCL") - GLOO = TensorTransport.Value("GLOO") - NIXL = TensorTransport.Value("NIXL") + DIRECT_TRANSPORT = TensorTransport.Value("DIRECT_TRANSPORT") @classmethod def from_str(cls, name: str) -> "TensorTransportEnum": diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 346e2e2a0e5e..443d676a455b 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -801,7 +801,7 @@ def put_object( value: Any, owner_address: Optional[str] = None, _is_experimental_channel: bool = False, - _tensor_transport: str = "object_store", + _tensor_transport: str = TensorTransportEnum.OBJECT_STORE.name, ): """Put value in the local object store. @@ -835,18 +835,15 @@ def put_object( "ray.ObjectRef in a list and call 'put' on it." ) tensors = None - tensor_transport: TensorTransportEnum = TensorTransportEnum.from_str( - _tensor_transport + from ray.experimental.gpu_object_manager.util import ( + normalize_and_validate_tensor_transport, + validate_one_sided, ) - if tensor_transport not in [ - TensorTransportEnum.OBJECT_STORE, - TensorTransportEnum.NIXL, - ]: - raise ValueError( - "Currently, Ray Direct Transport only supports 'object_store' and 'nixl' for tensor transport in ray.put()." - ) + + tensor_transport = normalize_and_validate_tensor_transport(_tensor_transport) + validate_one_sided(tensor_transport, "ray.put") try: - if tensor_transport != TensorTransportEnum.OBJECT_STORE: + if tensor_transport != TensorTransportEnum.OBJECT_STORE.name: ( serialized_value, tensors, @@ -867,19 +864,24 @@ def put_object( # object. Instead, clients will keep the object pinned. pin_object = not _is_experimental_channel + tensor_transport_enum = TensorTransportEnum.OBJECT_STORE + if tensor_transport != TensorTransportEnum.OBJECT_STORE.name: + tensor_transport_enum = TensorTransportEnum.DIRECT_TRANSPORT + # This *must* be the first place that we construct this python # ObjectRef because an entry with 0 local references is created when # the object is Put() in the core worker, expecting that this python # reference will be created. If another reference is created and # removed before this one, it will corrupt the state in the # reference counter. + ret = self.core_worker.put_object( serialized_value, pin_object=pin_object, owner_address=owner_address, inline_small_object=True, _is_experimental_channel=_is_experimental_channel, - tensor_transport_val=tensor_transport.value, + tensor_transport_val=tensor_transport_enum.value, ) if tensors: self.gpu_object_manager.put_object(ret, tensor_transport, tensors) @@ -896,43 +898,26 @@ def deserialize_objects( self, serialized_objects, object_refs, - tensor_transport_hint: Optional[TensorTransportEnum] = None, + tensor_transport_hint: Optional[str] = None, ): gpu_objects: Dict[str, List["torch.Tensor"]] = {} for obj_ref, (_, _, tensor_transport) in zip(object_refs, serialized_objects): - # TODO: Here tensor_transport_hint is set by the user in ray.get(), tensor_transport is set - # in serialize_objects by ray.method(tensor_transport="xxx"), and obj_ref.tensor_transport() - # is set by ray.put(). We may clean up this logic in the future. if ( tensor_transport is None or tensor_transport == TensorTransportEnum.OBJECT_STORE - ) and ( - obj_ref is None - or obj_ref.tensor_transport() == TensorTransportEnum.OBJECT_STORE.value ): # The object is not a gpu object, so we cannot use other external transport to # fetch it. continue - # If the object is a gpu object, we can choose to use the object store or other external - # transport to fetch it. The `tensor_transport_hint` has the highest priority, then the - # tensor_transport in obj_ref.tensor_transport(), then the tensor_transport in serialize_objects, - # then the default value `OBJECT_STORE`. - chosen_tensor_transport = ( - tensor_transport_hint - or ( - TensorTransportEnum(obj_ref.tensor_transport()) if obj_ref else None - ) - or tensor_transport - or TensorTransportEnum.OBJECT_STORE - ) - object_id = obj_ref.hex() if object_id not in gpu_objects: # If using a non-object store transport, then tensors will be sent # out-of-band. Get them before deserializing the object store data. + # The user can choose OBJECT_STORE as the hint to fetch the RDT object + # through the object store. gpu_objects[object_id] = self.gpu_object_manager.get_gpu_object( - object_id, tensor_transport=chosen_tensor_transport + object_id, tensor_transport=tensor_transport_hint ) # Function actor manager or the import thread may call pickle.loads @@ -983,16 +968,6 @@ def get_objects( f"Attempting to call `get` on the value {object_ref}, " "which is not an ray.ObjectRef." ) - tensor_transport: TensorTransportEnum = ( - TensorTransportEnum.from_str(_tensor_transport) - if _tensor_transport is not None - else None - ) - assert tensor_transport in [ - TensorTransportEnum.OBJECT_STORE, - TensorTransportEnum.NIXL, - None, - ], "Currently, RDT only supports 'object_store' and 'nixl' for tensor transport in ray.get()." timeout_ms = ( int(timeout * 1000) if timeout is not None and timeout != -1 else -1 ) @@ -1004,7 +979,7 @@ def get_objects( ) debugger_breakpoint = b"" - for data, metadata, _ in serialized_objects: + for _, metadata, _ in serialized_objects: if metadata: metadata_fields = metadata.split(b",") if len(metadata_fields) >= 2 and metadata_fields[1].startswith( @@ -1016,8 +991,17 @@ def get_objects( if skip_deserialization: return None, debugger_breakpoint + if _tensor_transport is not None: + from ray.experimental.gpu_object_manager.util import ( + normalize_and_validate_tensor_transport, + ) + + _tensor_transport = normalize_and_validate_tensor_transport( + _tensor_transport + ) + values = self.deserialize_objects( - serialized_objects, object_refs, tensor_transport_hint=tensor_transport + serialized_objects, object_refs, tensor_transport_hint=_tensor_transport ) if not return_exceptions: # Raise exceptions instead of returning them to the user. diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 631eca7631b4..fc2a816a4194 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2874,7 +2874,7 @@ cdef class CoreWorker: c_object_ids, timeout_ms, results) check_status(op_status) - return RayObjectsToSerializedRayObjects(results) + return RayObjectsToSerializedRayObjects(results, object_refs) def get_if_local(self, object_refs): """Get objects from local plasma store directly @@ -2886,7 +2886,7 @@ cdef class CoreWorker: check_status( CCoreWorkerProcess.GetCoreWorker().GetIfLocal( c_object_ids, &results)) - return RayObjectsToSerializedRayObjects(results) + return RayObjectsToSerializedRayObjects(results, object_refs) def object_exists(self, ObjectRef object_ref, memory_store_only=False): cdef: diff --git a/python/ray/actor.py b/python/ray/actor.py index 851330bf084d..b6f0024aa40c 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -473,9 +473,14 @@ def annotate_method(method: Callable[_P, _Ret]): if "enable_task_events" in kwargs and kwargs["enable_task_events"] is not None: method.__ray_enable_task_events__ = kwargs["enable_task_events"] if "tensor_transport" in kwargs: - method.__ray_tensor_transport__ = TensorTransportEnum.from_str( - kwargs["tensor_transport"] + tensor_transport = kwargs["tensor_transport"] + from ray.experimental.gpu_object_manager.util import ( + normalize_and_validate_tensor_transport, ) + + tensor_transport = normalize_and_validate_tensor_transport(tensor_transport) + method.__ray_tensor_transport__ = tensor_transport + return method # Check if decorator is called without parentheses (args[0] would be the function) @@ -521,7 +526,7 @@ def __init__( enable_task_events: bool, decorator: Optional[Any] = None, signature: Optional[List[inspect.Parameter]] = None, - tensor_transport: Optional[TensorTransportEnum] = None, + tensor_transport: Optional[str] = None, ): """Initialize an _ActorMethodMetadata. @@ -599,7 +604,7 @@ def __init__( enable_task_events: bool, decorator=None, signature: Optional[List[inspect.Parameter]] = None, - tensor_transport: Optional[TensorTransportEnum] = None, + tensor_transport: Optional[str] = None, ): """Initialize an ActorMethod. @@ -649,10 +654,10 @@ def __init__( # and return the resulting ObjectRefs. self._decorator = decorator - # If the task call doesn't specify a tensor transport option, use `_tensor_transport` + # If the task call doesn't specify a tensor transport option, use `OBJECT_STORE` # as the default transport for this actor method. if tensor_transport is None: - tensor_transport = TensorTransportEnum.OBJECT_STORE + tensor_transport = TensorTransportEnum.OBJECT_STORE.name self._tensor_transport = tensor_transport def __call__(self, *args, **kwargs): @@ -695,7 +700,12 @@ def options(self, **options): tensor_transport = options.get("tensor_transport", None) if tensor_transport is not None: - options["tensor_transport"] = TensorTransportEnum.from_str(tensor_transport) + from ray.experimental.gpu_object_manager.util import ( + normalize_and_validate_tensor_transport, + ) + + tensor_transport = normalize_and_validate_tensor_transport(tensor_transport) + options["tensor_transport"] = tensor_transport class FuncWrapper: def remote(self, *args, **kwargs): @@ -800,7 +810,7 @@ def _remote( concurrency_group=None, _generator_backpressure_num_objects=None, enable_task_events=None, - tensor_transport: Optional[TensorTransportEnum] = None, + tensor_transport: Optional[str] = None, ): if num_returns is None: num_returns = self._num_returns @@ -820,15 +830,15 @@ def _remote( if tensor_transport is None: tensor_transport = self._tensor_transport - if tensor_transport != TensorTransportEnum.OBJECT_STORE: + if tensor_transport != TensorTransportEnum.OBJECT_STORE.name: if num_returns != 1: raise ValueError( - f"Currently, methods with tensor_transport={tensor_transport.name} only support 1 return value. " + f"Currently, methods with tensor_transport={tensor_transport} only support 1 return value. " "Please make sure the actor method is decorated with `@ray.method(num_returns=1)` (the default)." ) if not self._actor._ray_enable_tensor_transport: raise ValueError( - f'Currently, methods with .options(tensor_transport="{tensor_transport.name}") are not supported when enable_tensor_transport=False. ' + f'Currently, methods with .options(tensor_transport="{tensor_transport}") are not supported when enable_tensor_transport=False. ' "Please set @ray.remote(enable_tensor_transport=True) on the actor class definition." ) gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager @@ -836,7 +846,7 @@ def _remote( self._actor, tensor_transport ): raise ValueError( - f'{self._actor} does not have tensor transport {tensor_transport.name} available. If using a collective-based transport ("nccl" or "gloo"), please create a communicator with ' + f'{self._actor} does not have tensor transport {tensor_transport} available. If using a collective-based transport ("nccl" or "gloo"), please create a communicator with ' "`ray.experimental.collective.create_collective_group` " "before calling actor tasks with non-default tensor_transport." ) @@ -876,7 +886,7 @@ def invocation(args, kwargs): invocation = self._decorator(invocation) object_refs = invocation(args, kwargs) - if tensor_transport != TensorTransportEnum.OBJECT_STORE: + if tensor_transport != TensorTransportEnum.OBJECT_STORE.name: # Currently, we only support transfer tensor out-of-band when # num_returns is 1. assert isinstance(object_refs, ObjectRef) @@ -979,14 +989,16 @@ def create( self.enable_task_events = {} self.generator_backpressure_num_objects = {} self.concurrency_group_for_methods = {} - self.method_name_to_tensor_transport: Dict[str, TensorTransportEnum] = {} + self.method_name_to_tensor_transport: Dict[str, str] = {} # Check whether any actor methods specify a non-default tensor transport. self.has_tensor_transport_methods = any( getattr( - method, "__ray_tensor_transport__", TensorTransportEnum.OBJECT_STORE + method, + "__ray_tensor_transport__", + TensorTransportEnum.OBJECT_STORE.name, ) - != TensorTransportEnum.OBJECT_STORE + != TensorTransportEnum.OBJECT_STORE.name for _, method in actor_methods ) @@ -1941,7 +1953,7 @@ def __init__( method_generator_backpressure_num_objects: Dict[str, int], method_enable_task_events: Dict[str, bool], enable_tensor_transport: bool, - method_name_to_tensor_transport: Dict[str, TensorTransportEnum], + method_name_to_tensor_transport: Dict[str, str], actor_method_cpus: int, actor_creation_function_descriptor, cluster_and_job, @@ -1968,7 +1980,7 @@ def __init__( this actor. If True, then methods can be called with .options(tensor_transport=...) to specify a non-default tensor transport. - method_name_to_tensor_transport: Dictionary mapping method names to their tensor transport settings. + method_name_to_tensor_transport: Dictionary mapping method names to their tensor transport type. actor_method_cpus: The number of CPUs required by actor methods. actor_creation_function_descriptor: The function descriptor for actor creation. cluster_and_job: The cluster and job information. @@ -2079,7 +2091,7 @@ def _actor_method_call( concurrency_group_name: Optional[str] = None, generator_backpressure_num_objects: Optional[int] = None, enable_task_events: Optional[bool] = None, - tensor_transport: Optional[TensorTransportEnum] = None, + tensor_transport: Optional[str] = None, ): """Method execution stub for an actor handle. @@ -2157,6 +2169,12 @@ def _actor_method_call( if generator_backpressure_num_objects is None: generator_backpressure_num_objects = -1 + tensor_transport_enum = TensorTransportEnum.OBJECT_STORE + if ( + tensor_transport is not None + and tensor_transport != TensorTransportEnum.OBJECT_STORE.name + ): + tensor_transport_enum = TensorTransportEnum.DIRECT_TRANSPORT object_refs = worker.core_worker.submit_actor_task( self._ray_actor_language, self._ray_actor_id, @@ -2171,7 +2189,7 @@ def _actor_method_call( concurrency_group_name if concurrency_group_name is not None else b"", generator_backpressure_num_objects, enable_task_events, - tensor_transport.value, + tensor_transport_enum.value, ) if num_returns == STREAMING_GENERATOR_RETURN: diff --git a/python/ray/experimental/collective/collective.py b/python/ray/experimental/collective/collective.py index 1872ace6fd8e..d66542d44c7e 100644 --- a/python/ray/experimental/collective/collective.py +++ b/python/ray/experimental/collective/collective.py @@ -43,7 +43,7 @@ def remove_remote_communicator(self, name: str): def get_collective_groups( self, actors: Optional[List[ray.actor.ActorHandle]] = None, - backend: Optional[str] = None, + backend: Optional[Backend] = None, ): """ Get the collective groups that the given actors are a subset of. Filter by @@ -62,28 +62,6 @@ def get_collective_groups( return collectives -def _do_init_collective_group( - self, - world_size: int, - rank: int, - backend: str = Backend.NCCL, - name: str = "default", -): - """Helper method that runs as a task on a remote actor to create a - collective group. - """ - ray.util.collective.init_collective_group( - world_size, rank, backend, group_name=name - ) - - -def _do_destroy_collective_group(self, name): - """Helper method that runs as a task on a remote actor to destroy a - collective group. - """ - ray.util.collective.destroy_collective_group(name) - - @PublicAPI(stability="alpha") def get_collective_groups( actors: List[ray.actor.ActorHandle], backend: Optional[str] = None @@ -102,6 +80,7 @@ def get_collective_groups( A list of communicator handles that the actors are a subset of. """ manager = RemoteCommunicatorManager.get() + backend = Backend(backend) if backend is not None else None return manager.get_collective_groups(actors, backend) @@ -163,10 +142,16 @@ def create_collective_group( metadata_key = get_master_address_metadata_key(name) internal_kv._internal_kv_put(metadata_key, f"{master_addr}:{master_port}") + def _do_init_collective_group(self, rank: int): + ray.util.collective.init_collective_group( + world_size, rank, backend, group_name=name + ) + try: init_tasks = [ actor.__ray_call__.remote( - _do_init_collective_group, world_size, rank, backend, name + _do_init_collective_group, + rank, ) for rank, actor in enumerate(actors) ] @@ -178,9 +163,7 @@ def create_collective_group( internal_kv._internal_kv_del(metadata_key) # Group was successfully created. - # Register GLOO groups under TORCH_GLOO since GLOO uses torch.distributed. - registration_backend = Backend.TORCH_GLOO if backend == Backend.GLOO else backend - comm = CommunicatorHandle(actors, name, registration_backend) + comm = CommunicatorHandle(actors, name, backend) manager.add_remote_communicator(comm) return comm @@ -206,9 +189,13 @@ def destroy_collective_group(group_or_name: Union[CommunicatorHandle, str]): manager = RemoteCommunicatorManager.get() group = manager.remove_remote_communicator(name) if group is not None: + + def _do_destroy_collective_group(self): + ray.util.collective.destroy_collective_group(name) + destroy_tasks = [ actor.__ray_call__.options(concurrency_group="_ray_system").remote( - _do_destroy_collective_group, name + _do_destroy_collective_group ) for actor in group.actors ] diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 1876ebeba311..ef18b07ca711 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -27,7 +27,6 @@ # GPUObjectMeta is a named tuple containing the source actor, tensor transport # backend, tensor metadata, and other information that needs to be recorded. # - The tensor transport backend is the backend used to transport the tensors. -# Currently, the supported backends are "nccl" and "torch_gloo". # - The tensor metadata is a list of tuples, each containing the shape and dtype # of a tensor in the GPU object store. class GPUObjectMeta(NamedTuple): @@ -284,7 +283,7 @@ def add_gpu_object_ref( self, obj_ref: ObjectRef, src_actor: "ray.actor.ActorHandle", - tensor_transport: TensorTransportEnum, + tensor_transport: str, tensor_transport_meta: Optional["TensorTransportMetadata"] = None, ): """Add a GPU object reference to the GPU object manager. This should be @@ -299,12 +298,8 @@ def add_gpu_object_ref( """ from ray.experimental.gpu_object_manager.gpu_object_store import ( __ray_get_tensor_transport_metadata__, - _tensor_transport_to_collective_backend, ) - tensor_transport_backend = _tensor_transport_to_collective_backend( - tensor_transport - ) obj_id = obj_ref.hex() if not tensor_transport_meta: # Submit a Ray actor task to the source actor to get the tensor metadata. @@ -315,14 +310,12 @@ def add_gpu_object_ref( # executing on the main thread blocking this task. tensor_meta = src_actor.__ray_call__.options( concurrency_group="_ray_system" - ).remote( - __ray_get_tensor_transport_metadata__, obj_id, tensor_transport_backend - ) + ).remote(__ray_get_tensor_transport_metadata__, obj_id, tensor_transport) else: tensor_meta = tensor_transport_meta self.managed_gpu_object_metadata[obj_id] = GPUObjectMeta( src_actor=src_actor, - tensor_transport_backend=tensor_transport_backend, + tensor_transport_backend=tensor_transport, tensor_transport_meta=tensor_meta, sent_dest_actors=set(), sent_to_src_actor_and_others_warned=False, @@ -335,7 +328,7 @@ def _get_gpu_object_metadata(self, obj_ref: ObjectRef) -> GPUObjectMeta: def _fetch_object( self, obj_id: str, - tensor_transport: TensorTransportEnum = TensorTransportEnum.OBJECT_STORE, + tensor_transport: Optional[str], ): """ Fetches the GPU object from the source actor's GPU object store via the object store @@ -348,6 +341,9 @@ def _fetch_object( Args: obj_id: The object ID of the GPU object. tensor_transport: The tensor transport to use to fetch the GPU object. + This should either be object store or the tensor transport for the RDT object. + If this is None, the tensor transport backend of the RDT object will be used. + Note that NIXL is the only supported RDT tensor transport right now. Returns: None @@ -359,23 +355,33 @@ def _fetch_object( get_tensor_transport_manager, ) - if tensor_transport not in [ - TensorTransportEnum.OBJECT_STORE, - TensorTransportEnum.NIXL, - ]: - raise ValueError( - f"Currently ray.get() only supports OBJECT_STORE and NIXL tensor transport, got {tensor_transport}, please specify the correct tensor transport in ray.get()." - ) - if self.gpu_object_store.has_object(obj_id): return + gpu_object_meta = self.managed_gpu_object_metadata[obj_id] - src_actor = gpu_object_meta.src_actor tensor_transport_backend = gpu_object_meta.tensor_transport_backend + if tensor_transport is None: + tensor_transport = tensor_transport_backend + + from ray.experimental.gpu_object_manager.util import validate_one_sided + + validate_one_sided(tensor_transport, "ray.get") + + if ( + tensor_transport != TensorTransportEnum.OBJECT_STORE.name + and tensor_transport != tensor_transport_backend + ): + raise ValueError( + f"Got {tensor_transport} and object had tensor transport backend {tensor_transport_backend}, " + "please specify the correct tensor transport in ray.get()." + ) + + src_actor = gpu_object_meta.src_actor tensor_transport_manager = get_tensor_transport_manager( tensor_transport_backend ) - if tensor_transport == TensorTransportEnum.OBJECT_STORE: + + if tensor_transport == TensorTransportEnum.OBJECT_STORE.name: tensors = ray.get( src_actor.__ray_call__.options(concurrency_group="_ray_system").remote( __ray_fetch_gpu_object__, obj_id @@ -552,7 +558,7 @@ def trigger_out_of_band_tensor_transfer( def get_gpu_object( self, object_id: str, - tensor_transport: TensorTransportEnum = TensorTransportEnum.OBJECT_STORE, + tensor_transport: Optional[str], ) -> List["torch.Tensor"]: """ Get the GPU object for a given object ID. @@ -610,7 +616,7 @@ def free_object_primary_copy(self, object_id: str): ) def actor_has_tensor_transport( - self, actor: "ray.actor.ActorHandle", tensor_transport: TensorTransportEnum + self, actor: "ray.actor.ActorHandle", tensor_transport: str ): """ Check if the actor has a communicator for the given tensor transport backend. @@ -622,27 +628,17 @@ def actor_has_tensor_transport( Returns: True if the actor has a communicator for the given tensor transport backend, False otherwise. """ - # Import get_collective_groups here to avoid dependency on - # collective libraries for default Ray installation. - from ray.experimental.gpu_object_manager.gpu_object_store import ( - _tensor_transport_to_collective_backend, - ) from ray.experimental.gpu_object_manager.util import ( get_tensor_transport_manager, ) - tensor_transport_backend = _tensor_transport_to_collective_backend( - tensor_transport - ) - tensor_transport_manager = get_tensor_transport_manager( - tensor_transport_backend - ) + tensor_transport_manager = get_tensor_transport_manager(tensor_transport) return tensor_transport_manager.actor_has_tensor_transport(actor) def put_object( self, obj_ref: ObjectRef, - tensor_transport: TensorTransportEnum, + tensor_transport: str, tensors: List["torch.Tensor"], ): """ @@ -654,17 +650,11 @@ def put_object( tensors: The tensors to put into the GPU object manager. """ - from ray.experimental.gpu_object_manager.gpu_object_store import ( - _tensor_transport_to_collective_backend, - ) from ray.experimental.gpu_object_manager.util import ( get_tensor_transport_manager, ) - tensor_transport_backend = _tensor_transport_to_collective_backend( - tensor_transport - ) - transport_manager = get_tensor_transport_manager(tensor_transport_backend) + transport_manager = get_tensor_transport_manager(tensor_transport) tensor_transport_meta = transport_manager.extract_tensor_transport_metadata( obj_ref.hex(), tensors ) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index e7db5cf3606f..44d5dcf59e86 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -4,7 +4,6 @@ from typing import Any, Dict, List, Optional, Set, Union import ray -from ray._private.custom_types import TensorTransportEnum from ray._raylet import ObjectRef from ray.experimental.gpu_object_manager.types import ( CommunicatorMetadata, @@ -23,23 +22,6 @@ "Please install torch with 'pip install torch' to use this feature." ) -TENSOR_TRANSPORT_TO_COLLECTIVE_BACKEND = { - TensorTransportEnum.NCCL: "nccl", - TensorTransportEnum.GLOO: "torch_gloo", - TensorTransportEnum.NIXL: "nixl", -} - - -def _tensor_transport_to_collective_backend( - tensor_transport: TensorTransportEnum, -) -> str: - try: - return TENSOR_TRANSPORT_TO_COLLECTIVE_BACKEND[tensor_transport] - except KeyError: - raise ValueError( - f"Invalid tensor transport {tensor_transport.name}, must be one of {list(TENSOR_TRANSPORT_TO_COLLECTIVE_BACKEND.keys())}." - ) - def __ray_get_tensor_transport_metadata__( self, obj_id: str, backend: str diff --git a/python/ray/experimental/gpu_object_manager/nixl_tensor_transport.py b/python/ray/experimental/gpu_object_manager/nixl_tensor_transport.py index 05dd4eb6a182..52aad44c61c2 100644 --- a/python/ray/experimental/gpu_object_manager/nixl_tensor_transport.py +++ b/python/ray/experimental/gpu_object_manager/nixl_tensor_transport.py @@ -47,7 +47,7 @@ def __init__(self, tensor_transport_backend: str): @property def tensor_transport_backend(self) -> str: - return "nixl" + return "NIXL" @staticmethod def is_one_sided() -> bool: @@ -79,7 +79,6 @@ def get_nixl_agent(self): def actor_has_tensor_transport(self, actor: "ray.actor.ActorHandle") -> bool: # TODO(dayshah): This is called on a .remote RDT call, so it's quite expensive. - def __ray_actor_has_tensor_transport__( self: "ray.actor.ActorHandle", ) -> bool: @@ -89,7 +88,7 @@ def __ray_actor_has_tensor_transport__( get_tensor_transport_manager, ) - get_tensor_transport_manager("nixl").get_nixl_agent() + get_tensor_transport_manager("NIXL").get_nixl_agent() return True except Exception: return False diff --git a/python/ray/experimental/gpu_object_manager/util.py b/python/ray/experimental/gpu_object_manager/util.py index 1ae94893c3b3..7bb5b8e8c363 100644 --- a/python/ray/experimental/gpu_object_manager/util.py +++ b/python/ray/experimental/gpu_object_manager/util.py @@ -1,6 +1,7 @@ import threading from typing import TYPE_CHECKING +from ray._private.custom_types import TensorTransportEnum from ray.experimental.gpu_object_manager.collective_tensor_transport import ( CollectiveTensorTransport, ) @@ -17,15 +18,15 @@ # Class definitions for transport managers transport_manager_classes: dict[str, TensorTransportManager] = { - "nixl": NixlTensorTransport, - "torch_gloo": CollectiveTensorTransport, - "nccl": CollectiveTensorTransport, + "NIXL": NixlTensorTransport, + "GLOO": CollectiveTensorTransport, + "NCCL": CollectiveTensorTransport, } transport_devices = { - "nixl": ["cuda", "cpu"], - "torch_gloo": ["cpu"], - "nccl": ["cuda"], + "NIXL": ["cuda", "cpu"], + "GLOO": ["cpu"], + "NCCL": ["cuda"], } @@ -70,3 +71,26 @@ def device_match_transport(device: "torch.device", tensor_transport: str) -> boo raise ValueError(f"Unsupported tensor transport protocol: {tensor_transport}") return device.type in transport_devices[tensor_transport] + + +def normalize_and_validate_tensor_transport(tensor_transport: str) -> str: + tensor_transport = tensor_transport.upper() + + if ( + tensor_transport != TensorTransportEnum.OBJECT_STORE.name + and tensor_transport not in transport_manager_classes + ): + raise ValueError(f"Invalid tensor transport: {tensor_transport}") + + return tensor_transport + + +def validate_one_sided(tensor_transport: str, ray_usage_func: str): + if ( + tensor_transport != TensorTransportEnum.OBJECT_STORE.name + and not transport_manager_classes[tensor_transport].is_one_sided() + ): + raise ValueError( + f"Trying to use two-sided tensor transport: {tensor_transport} for {ray_usage_func}. " + "This is only supported for one-sided transports such as NIXL or the OBJECT_STORE." + ) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 1a5c460b950c..e290b7c3b050 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -276,9 +276,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CTensorTransport TENSOR_TRANSPORT_OBJECT_STORE "ray::rpc::TensorTransport::OBJECT_STORE" - cdef CTensorTransport TENSOR_TRANSPORT_NCCL "ray::rpc::TensorTransport::NCCL" - cdef CTensorTransport TENSOR_TRANSPORT_GLOO "ray::rpc::TensorTransport::GLOO" - cdef CTensorTransport TENSOR_TRANSPORT_NIXL "ray::rpc::TensorTransport::NIXL" + cdef CTensorTransport TENSOR_TRANSPORT_DIRECT_TRANSPORT "ray::rpc::TensorTransport::DIRECT_TRANSPORT" cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CPlacementStrategy PLACEMENT_STRATEGY_PACK \ diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index efd5dac0f39e..28bf9d72efb1 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -10,7 +10,6 @@ import ray from ray._common.test_utils import SignalActor, wait_for_condition -from ray._private.custom_types import TensorTransportEnum from ray.experimental.collective import create_collective_group # tensordict is not supported on macos ci, so we skip the tests @@ -519,7 +518,7 @@ def test_trigger_out_of_band_tensor_transfer(ray_start_regular): assert torch.equal(ret_val_src[0], tensor) gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager - gpu_object_manager.add_gpu_object_ref(gpu_ref, src_actor, TensorTransportEnum.GLOO) + gpu_object_manager.add_gpu_object_ref(gpu_ref, src_actor, "GLOO") # Trigger out-of-band tensor transfer from src_actor to dst_actor. task_args = (gpu_ref,) @@ -569,6 +568,17 @@ class InvalidActor: def echo(self, data): return data + actor = GPUTestActor.remote() + with pytest.raises(ValueError, match="Invalid tensor transport"): + actor.double.options(tensor_transport="invalid").remote(torch.randn((1,))) + + with pytest.raises(ValueError, match="Invalid tensor transport"): + ray.put(torch.randn((1,)), _tensor_transport="invalid") + + valid_ref = actor.double.remote(torch.randn((1,))) + with pytest.raises(ValueError, match="Invalid tensor transport"): + ray.get(valid_ref, _tensor_transport="invalid") + @pytest.mark.skipif( not support_tensordict, diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index 1476512a2f1b..8803da0219eb 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -88,9 +88,7 @@ def create_collective_group( metadata as well. """ backend = types.Backend(backend) - if backend == types.Backend.MPI: - raise RuntimeError("Ray does not support MPI.") - elif backend == types.Backend.GLOO or backend == types.Backend.TORCH_GLOO: + if backend == types.Backend.GLOO: # Rendezvous: ensure a MASTER_ADDR:MASTER_PORT is published in internal_kv. metadata_key = _get_master_addr_key(group_name) if rank == 0: @@ -816,9 +814,6 @@ def _check_backend_availability(backend: types.Backend): elif backend == types.Backend.NCCL: if not nccl_available(): raise RuntimeError("NCCL is not available.") - elif backend == types.Backend.TORCH_GLOO: - if not torch_distributed_available(): - raise RuntimeError("torch.distributed is not available.") def _check_inside_actor(): diff --git a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py index d2314c5ea54a..cf06728739c3 100644 --- a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py +++ b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py @@ -106,7 +106,7 @@ def destroy_group(self): @classmethod def backend(cls): """The backend of this collective group.""" - return Backend.TORCH_GLOO + return Backend.GLOO def _check_tensor_input(self, tensor: List["torch.Tensor"]) -> "torch.Tensor": """ray.util.collective wraps tensor arguments in a list. diff --git a/python/ray/util/collective/types.py b/python/ray/util/collective/types.py index bf357bbd24c6..23d43cdae005 100644 --- a/python/ray/util/collective/types.py +++ b/python/ray/util/collective/types.py @@ -34,24 +34,21 @@ def torch_available(): class Backend(object): """A class to represent different backends.""" - NCCL = "nccl" - MPI = "mpi" - # `pygloo` is deprecated. Use gloo through torch.distributed for both - # `GLOO` and `TORCH_GLOO`. - GLOO = "gloo" - # Use gloo through torch.distributed. - TORCH_GLOO = "torch_gloo" - NIXL = "nixl" + NCCL = "NCCL" + GLOO = "GLOO" UNRECOGNIZED = "unrecognized" def __new__(cls, name: str): - backend = getattr(Backend, name.upper(), Backend.UNRECOGNIZED) + upper_name = name.upper() + backend = getattr(Backend, upper_name, Backend.UNRECOGNIZED) if backend == Backend.UNRECOGNIZED: + if upper_name == "TORCH_GLOO": + return Backend.GLOO raise ValueError( - "Unrecognized backend: '{}'. Only NCCL is supported".format(name) + "Unrecognized backend: '{}'. Only NCCL and GLOO are supported".format( + name + ) ) - if backend == Backend.MPI: - raise RuntimeError("Ray does not support MPI backend.") return backend diff --git a/src/ray/core_worker/task_submission/tests/dependency_resolver_test.cc b/src/ray/core_worker/task_submission/tests/dependency_resolver_test.cc index 28c87febf837..83f5c691dfa0 100644 --- a/src/ray/core_worker/task_submission/tests/dependency_resolver_test.cc +++ b/src/ray/core_worker/task_submission/tests/dependency_resolver_test.cc @@ -465,7 +465,7 @@ TEST(LocalDependencyResolverTest, TestMixedTensorTransport) { LocalDependencyResolver resolver( *store, *task_manager, actor_creator, [&](const ObjectID &object_id) { if (object_id == obj1) { - return rpc::TensorTransport::NCCL; + return rpc::TensorTransport::DIRECT_TRANSPORT; } return rpc::TensorTransport::OBJECT_STORE; }); diff --git a/src/ray/core_worker/tests/task_manager_test.cc b/src/ray/core_worker/tests/task_manager_test.cc index ec4cc29f3d7a..dfcf9c42ef29 100644 --- a/src/ray/core_worker/tests/task_manager_test.cc +++ b/src/ray/core_worker/tests/task_manager_test.cc @@ -64,7 +64,7 @@ TaskSpecification CreateTaskHelper(uint64_t num_returns, if (enable_tensor_transport) { // Currently, only actors support transferring tensors out-of-band. task.GetMutableMessage().set_type(TaskType::ACTOR_TASK); - tensor_transport = rpc::TensorTransport::NCCL; + tensor_transport = rpc::TensorTransport::DIRECT_TRANSPORT; } task.GetMutableMessage().set_tensor_transport(tensor_transport); @@ -2807,7 +2807,7 @@ TEST_F(TaskManagerTest, TestGPUObjectTaskSuccess) { ObjectID gpu_obj_ref = ObjectID::FromRandom(); auto *arg = spec.GetMutableMessage().add_args(); arg->set_is_inlined(false); - arg->set_tensor_transport(rpc::TensorTransport::NCCL); + arg->set_tensor_transport(rpc::TensorTransport::DIRECT_TRANSPORT); arg->mutable_object_ref()->set_object_id(gpu_obj_ref.Binary()); // `gpu_obj_ref` should have a local reference when the sender actor diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 82d6b66f1c92..1bb541e7ec4c 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -732,12 +732,8 @@ message ObjectReferenceCount { enum TensorTransport { // Use the default object store for tensor transport. OBJECT_STORE = 0; - // Use NCCL for tensor transport. - NCCL = 1; - // Use GLOO for tensor transport. - GLOO = 2; - // Use NIXL for tensor transport. - NIXL = 3; + // Use Ray Direct Transport to transfer tensors directly between workers. + DIRECT_TRANSPORT = 1; } // Argument in the task.