Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 120 additions & 51 deletions algorithms/cudahip/Reduction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <math.h>

namespace device {

constexpr int BlockSize = 1024;
constexpr int ItemsPerThread = 4;
template <typename T>
struct Sum {
T defaultValue{0};
Expand Down Expand Up @@ -44,70 +47,122 @@ __forceinline__ __device__ T shuffledown(T value, int offset) {
}
#endif

// a rather "dumb", but general reduction kernel
// (not intended for intensive use; there's the thrust libraries instead)
// Warp reduce operation similar to SYCL
template <typename T, typename OperationT>
__device__ __forceinline__ T warpReduce(T value, OperationT operation) {

template <typename AccT, typename VecT, typename OperationT>
__launch_bounds__(1024) void __global__ kernel_reduce(
AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) {
__shared__ AccT shmem[256];
const auto warpCount = blockDim.x / warpSize;
const auto currentWarp = threadIdx.x / warpSize;
const auto threadInWarp = threadIdx.x % warpSize;
const auto warpsNeeded = (size + warpSize - 1) / warpSize;

auto value = operation.defaultValue;
auto acc = operation.defaultValue;

#pragma unroll 4
for (std::size_t i = currentWarp; i < warpsNeeded; i += warpCount) {
const auto id = threadInWarp + i * warpSize;
const auto valueNew =
(id < size) ? static_cast<AccT>(ntload(&vector[id])) : operation.defaultValue;

value = operation(value, valueNew);
#if (defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 800)
// C++17 compile-time check to ensure we only use this for ints
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, unsigned int>) {

unsigned int mask = 0xFFFFFFFF; // 32-bit active thread mask

if constexpr (std::is_same_v<OperationT, device::Sum<T>>) {
return __reduce_add_sync(mask, value);
} else if constexpr (std::is_same_v<OperationT, device::Min<T>>) {
return __reduce_min_sync(mask, value);
} else if constexpr (std::is_same_v<OperationT, device::Max<T>>) {
return __reduce_max_sync(mask, value);
}
}
#endif

for (int offset = 1; offset < warpSize; offset *= 2) {
for (int offset = warpSize / 2; offset > 0; offset /= 2) {
value = operation(value, shuffledown(value, offset));
Copy link
Contributor

@davschneller davschneller Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a side note; you could use cub in the CUDA case here. (there are sometimes, e.g. for min/max more advanced reduction functions starting with Ampere) That should come with CUDA included.

For the AMD case, there should be an equivalent library (which might require explicit linking, however) — or you could use the OCKL functions (as long as they exist). See here for a forward as it's used: https://github.com/ROCm/clr/blob/b90c29358c694e66ea78cb1e3957edad09f35cbf/hipamd/include/hip/amd_detail/amd_warp_sync_functions.h#L57-L92 (you can almost just take a copy of that declaration code for the relevant functions—or try to use the HIP functions below ... EDIT: found the docs: https://rocm.docs.amd.com/projects/HIP/en/latest/how-to/hip_cpp_language_extensions.html )

... though I'm not sure if there's much speedup to gain in either case.
And it could also be put in some future PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I understand it correctly, and in https://rocm.docs.amd.com/projects/HIP/en/latest/how-to/hip_cpp_language_extensions.html -- I found the __reduce functions for warps only for int or unsigned int types. So, I added these if conditions, I am unsure how benificial they would be. If you suggest removing this, and keeping the manual ones, I will remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I understand it correctly, and in https://rocm.docs.amd.com/projects/HIP/en/latest/how-to/hip_cpp_language_extensions.html -- I found the __reduce functions for warps only for int or unsigned int types. So, I added these if conditions, I am unsure how benificial they would be. If you suggest removing this, and keeping the manual ones, I will remove them.

Surprisingly, these functions are not recognized by HIP in the CI, and they fail, while the NVIDIA tests pass silently. I am a bit confused, but for now, I removed this for HIP too. If there is a particular way to get this done, please let me know. :)

}
return value;
}

acc = operation(acc, value);
// Helper function for Generic Atomic Update
// Fallback to atomicCAS-based implementation if atomic instruction is not available
// Picked from: https://docs.nvidia.com/cuda/cuda-c-programming-guide/#atomic-functions
template <typename T, typename OperationT>
__device__ __forceinline__ void atomicUpdate(T* address, T val, OperationT operation) {
unsigned long long* address_as_ull = (unsigned long long*)address;
unsigned long long old = *address_as_ull, assumed;
do {
assumed = old;
T calculatedRes = operation(*(T*)&assumed, val);
old = atomicCAS(address_as_ull, assumed, *(unsigned long long*)&calculatedRes);
} while (assumed != old);
}

if (threadInWarp == 0) {
shmem[currentWarp] = acc;
}
// Native atomics
template <>
__device__ __forceinline__ void
atomicUpdate<int, device::Sum<int>>(int* address, int val, device::Sum<int> operation) {
atomicAdd(address, val);
}
template <>
__device__ __forceinline__ void atomicUpdate<float, device::Sum<float>>(
float* address, float val, device::Sum<float> operation) {
atomicAdd(address, val);
}
#if __CUDA_ARCH__ >= 600
template <>
__device__ __forceinline__ void atomicUpdate<double, device::Sum<double>>(
double* address, double val, device::Sum<double> operation) {
atomicAdd(address, val);
}
#endif

// Block Reduce
template <typename T, typename OperationT>
__device__ __forceinline__ T blockReduce(T val, T* shmem, OperationT operation) {

const int laneId = threadIdx.x % warpSize;
const int warpId = threadIdx.x / warpSize;

val = warpReduce(val, operation);
if (laneId == 0) {
shmem[warpId] = val;
Copy link
Contributor

@davschneller davschneller Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle, you could already use the atomics here.
That's most likely how e.g. SYCL did it internally (or the respective OCKL reductions would do it, or maybe cub has something as well—though I'm not sure how public those are).

(though IIRC there was a performance regression for FP32 on the MI250X in LDS, and only there with that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. However, given that exact LDS FP32 regression you mentioned on the MI250X, I think sticking to the two-pass shuffle (Warp 0 sweeping the shared memory) seems like a safer bet for now. But if you insist on implementing it that way, I can try in a different PR, but I will need to benchmark the differences, and I don't have access to LUMI as of now.

}
__syncthreads();

if (currentWarp == 0) {
const auto lastWarpsNeeded = (warpCount + warpSize - 1) / warpSize;
const int numWarps = BlockSize / warpSize;
val = (threadIdx.x < numWarps) ? shmem[laneId] : operation.defaultValue;

auto value = operation.defaultValue;
auto lastAcc = operation.defaultValue;
if (warpId == 0) {
val = warpReduce(val, operation);
}

#pragma unroll 2
for (int i = 0; i < lastWarpsNeeded; ++i) {
const auto id = threadInWarp + i * warpSize;
const auto valueNew = (id < warpCount) ? shmem[id] : operation.defaultValue;
return val;
}

value = operation(value, valueNew);
}
// Init Kernel to handle overrideResult safely across multiple blocks
template <typename T, typename OperationT>
__global__ void initKernel(T* result, OperationT operation) {
if (threadIdx.x == 0) {
*result = operation.defaultValue;
}
}

for (int offset = 1; offset < warpSize; offset *= 2) {
value = operation(value, shuffledown(value, offset));
}
template <typename AccT, typename VecT, typename OperationT>
__launch_bounds__(BlockSize) void __global__ kernel_reduce(
AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) {

// Maximum block size 1024, warp size 32 so 1024/32 = 32 chosen
// For AMD, warp size 64, 1024/64 = 16, but 32 should work with a few idle memory addresses
__shared__ AccT shmem[32];

lastAcc = operation(lastAcc, value);
AccT threadAcc = operation.defaultValue;
size_t blockBaseIdx = blockIdx.x * (BlockSize * ItemsPerThread);
size_t threadBaseIdx = blockBaseIdx + threadIdx.x;

if (threadIdx.x == 0) {
if (overrideResult) {
ntstore(result, lastAcc);
} else {
ntstore(result, operation(ntload(result), lastAcc));
}
#pragma unroll
for (int i = 0; i < ItemsPerThread; i++) {
size_t idx = threadBaseIdx + i * BlockSize;
if (idx < size) {
threadAcc = operation(threadAcc, static_cast<AccT>(ntload(&vector[idx])));
}
}

AccT blockAcc = blockReduce<AccT, OperationT>(threadAcc, shmem, operation);

if (threadIdx.x == 0) {
(void)overrideResult; // to silence unused parameter warning for non-Add reductions
atomicUpdate(result, blockAcc, operation);
}
}

template <typename AccT, typename VecT>
Expand All @@ -119,22 +174,36 @@ void Algorithms::reduceVector(AccT* result,
void* streamPtr) {
auto* stream = reinterpret_cast<internals::DeviceStreamT>(streamPtr);

dim3 grid(1, 1, 1);
dim3 block(1024, 1, 1);
const size_t totalItems = BlockSize * ItemsPerThread;
const size_t numBlocks = (size + totalItems - 1) / totalItems;

if (overrideResult) {
switch (type) {
case ReductionType::Add:
initKernel<<<1, 1, 0, stream>>>(result, device::Sum<AccT>());
break;
case ReductionType::Max:
initKernel<<<1, 1, 0, stream>>>(result, device::Max<AccT>());
break;
case ReductionType::Min:
initKernel<<<1, 1, 0, stream>>>(result, device::Min<AccT>());
break;
}
}

switch (type) {
case ReductionType::Add: {
kernel_reduce<<<grid, block, 0, stream>>>(
kernel_reduce<<<numBlocks, BlockSize, 0, stream>>>(
result, buffer, size, overrideResult, device::Sum<AccT>());
break;
}
case ReductionType::Max: {
kernel_reduce<<<grid, block, 0, stream>>>(
kernel_reduce<<<numBlocks, BlockSize, 0, stream>>>(
result, buffer, size, overrideResult, device::Max<AccT>());
break;
}
case ReductionType::Min: {
kernel_reduce<<<grid, block, 0, stream>>>(
kernel_reduce<<<numBlocks, BlockSize, 0, stream>>>(
result, buffer, size, overrideResult, device::Min<AccT>());
break;
}
Expand Down