Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def _from_bridge_msg(self):

def _from_raw_msg(self):
image_size = len(self.raw_msg.data)
err, device_ptr = runtime.cudaMalloc(image_size)
self.ASSERT_CUDA_SUCCESS(err)
device_ptr = self._acquire_raw_cuda_buffer(image_size)
err, = runtime.cudaMemcpy(device_ptr, self.raw_msg.data, image_size,
runtime.cudaMemcpyKind.cudaMemcpyHostToDevice)
self.ASSERT_CUDA_SUCCESS(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from ctypes import c_long
import math
import os
import atexit
import threading
import sys

import cuda.bindings.driver as driver
import cuda.bindings.runtime as runtime
Expand All @@ -34,6 +37,75 @@
from sensor_msgs.msg import PointCloud2


_REUSE_RAW_CUDA_BUFFER = os.environ.get("PYNITROS_REUSE_RAW_CUDA_BUFFER", "0").strip().lower() in (
"1", "true", "yes", "on"
)
_RAW_CUDA_BUFFER_RECEIPTS = os.environ.get("PYNITROS_RAW_CUDA_BUFFER_RECEIPTS", "0").strip().lower() in (
"1", "true", "yes", "on"
)

_raw_pool_lock = threading.Lock()
_raw_pool: list[tuple[int, int]] = [] # (device_ptr, capacity_bytes)
_raw_pool_in_use: set[int] = set()
_raw_pool_alloc_calls = 0
_raw_pool_reuse_calls = 0


def _raw_pool_acquire(min_bytes: int) -> int:
global _raw_pool_alloc_calls, _raw_pool_reuse_calls
with _raw_pool_lock:
# Prefer a free buffer with enough capacity.
for i, (ptr, cap) in enumerate(_raw_pool):
if ptr in _raw_pool_in_use:
continue
if cap >= min_bytes:
_raw_pool_in_use.add(ptr)
_raw_pool_reuse_calls += 1
return ptr

# Allocate a new buffer.
err, device_ptr = runtime.cudaMalloc(min_bytes)
if err != 0:
raise RuntimeError(f"cudaMalloc failed: {err}")
ptr_int = int(device_ptr)
_raw_pool.append((ptr_int, int(min_bytes)))
_raw_pool_in_use.add(ptr_int)
_raw_pool_alloc_calls += 1
return ptr_int


def _raw_pool_release(ptr: int) -> None:
with _raw_pool_lock:
_raw_pool_in_use.discard(int(ptr))


def _raw_pool_shutdown() -> None:
# Best-effort: free any pooled buffers at process exit.
# If CUDA is not initialized / already torn down, ignore errors.
with _raw_pool_lock:
ptrs = [ptr for (ptr, _cap) in _raw_pool]
_raw_pool.clear()
_raw_pool_in_use.clear()
for ptr in ptrs:
try:
runtime.cudaFree(ptr)
except Exception:
pass
if _RAW_CUDA_BUFFER_RECEIPTS:
try:
print(
"pynitros raw cuda buffer receipts: "
f"alloc_calls={_raw_pool_alloc_calls} reuse_calls={_raw_pool_reuse_calls}",
file=sys.stderr,
)
except Exception:
pass


if _REUSE_RAW_CUDA_BUFFER or _RAW_CUDA_BUFFER_RECEIPTS:
atexit.register(_raw_pool_shutdown)


class PyNitrosTypeViewBase():
"""
Base class of PyNITROS message view.
Expand Down Expand Up @@ -189,8 +261,22 @@ def postprocess(self):
self._cpu_shared_mem.cpu_shared_mem_obj.close()
self._cpu_shared_mem.cpu_shared_mem.close_fd()
else:
# Free the memory
runtime.cudaFree(self.gpu_ptr)
# Free (or recycle) the memory for raw ROS messages.
if _REUSE_RAW_CUDA_BUFFER and getattr(self, "_pynitros_raw_cuda_pool", False):
_raw_pool_release(int(self.gpu_ptr))
else:
runtime.cudaFree(self.gpu_ptr)

def _acquire_raw_cuda_buffer(self, size_bytes: int) -> int:
"""Acquire a reusable raw CUDA buffer (H2D staging)."""
if not _REUSE_RAW_CUDA_BUFFER:
err, device_ptr = runtime.cudaMalloc(size_bytes)
self.ASSERT_CUDA_SUCCESS(err)
return int(device_ptr)
ptr = _raw_pool_acquire(int(size_bytes))
# Mark so postprocess() can recycle it instead of freeing.
self._pynitros_raw_cuda_pool = True
return int(ptr)

def get_handle(self):
return self.handle
Expand Down