-
Notifications
You must be signed in to change notification settings - Fork 1
Modified CUDA/HIP reduction algorithm #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,9 @@ | |
| #include <math.h> | ||
|
|
||
| namespace device { | ||
|
|
||
| constexpr int BlockSize = 1024; | ||
| constexpr int ItemsPerThread = 4; | ||
| template <typename T> | ||
| struct Sum { | ||
| T defaultValue{0}; | ||
|
|
@@ -44,70 +47,122 @@ __forceinline__ __device__ T shuffledown(T value, int offset) { | |
| } | ||
| #endif | ||
|
|
||
| // a rather "dumb", but general reduction kernel | ||
| // (not intended for intensive use; there's the thrust libraries instead) | ||
| // Warp reduce operation similar to SYCL | ||
| template <typename T, typename OperationT> | ||
| __device__ __forceinline__ T warpReduce(T value, OperationT operation) { | ||
|
|
||
| template <typename AccT, typename VecT, typename OperationT> | ||
| __launch_bounds__(1024) void __global__ kernel_reduce( | ||
| AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) { | ||
| __shared__ AccT shmem[256]; | ||
| const auto warpCount = blockDim.x / warpSize; | ||
| const auto currentWarp = threadIdx.x / warpSize; | ||
| const auto threadInWarp = threadIdx.x % warpSize; | ||
| const auto warpsNeeded = (size + warpSize - 1) / warpSize; | ||
|
|
||
| auto value = operation.defaultValue; | ||
| auto acc = operation.defaultValue; | ||
|
|
||
| #pragma unroll 4 | ||
| for (std::size_t i = currentWarp; i < warpsNeeded; i += warpCount) { | ||
| const auto id = threadInWarp + i * warpSize; | ||
| const auto valueNew = | ||
| (id < size) ? static_cast<AccT>(ntload(&vector[id])) : operation.defaultValue; | ||
|
|
||
| value = operation(value, valueNew); | ||
| #if (defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 800) | ||
| // C++17 compile-time check to ensure we only use this for ints | ||
| if constexpr (std::is_same_v<T, int> || std::is_same_v<T, unsigned int>) { | ||
|
|
||
| unsigned int mask = 0xFFFFFFFF; // 32-bit active thread mask | ||
|
|
||
| if constexpr (std::is_same_v<OperationT, device::Sum<T>>) { | ||
| return __reduce_add_sync(mask, value); | ||
| } else if constexpr (std::is_same_v<OperationT, device::Min<T>>) { | ||
| return __reduce_min_sync(mask, value); | ||
| } else if constexpr (std::is_same_v<OperationT, device::Max<T>>) { | ||
| return __reduce_max_sync(mask, value); | ||
| } | ||
| } | ||
| #endif | ||
|
|
||
| for (int offset = 1; offset < warpSize; offset *= 2) { | ||
| for (int offset = warpSize / 2; offset > 0; offset /= 2) { | ||
| value = operation(value, shuffledown(value, offset)); | ||
| } | ||
| return value; | ||
| } | ||
|
|
||
| acc = operation(acc, value); | ||
| // Helper function for Generic Atomic Update | ||
| // Fallback to atomicCAS-based implementation if atomic instruction is not available | ||
| // Picked from: https://docs.nvidia.com/cuda/cuda-c-programming-guide/#atomic-functions | ||
| template <typename T, typename OperationT> | ||
| __device__ __forceinline__ void atomicUpdate(T* address, T val, OperationT operation) { | ||
| unsigned long long* address_as_ull = (unsigned long long*)address; | ||
| unsigned long long old = *address_as_ull, assumed; | ||
| do { | ||
| assumed = old; | ||
| T calculatedRes = operation(*(T*)&assumed, val); | ||
| old = atomicCAS(address_as_ull, assumed, *(unsigned long long*)&calculatedRes); | ||
| } while (assumed != old); | ||
| } | ||
|
|
||
| if (threadInWarp == 0) { | ||
| shmem[currentWarp] = acc; | ||
| } | ||
| // Native atomics | ||
| template <> | ||
| __device__ __forceinline__ void | ||
| atomicUpdate<int, device::Sum<int>>(int* address, int val, device::Sum<int> operation) { | ||
| atomicAdd(address, val); | ||
| } | ||
| template <> | ||
| __device__ __forceinline__ void atomicUpdate<float, device::Sum<float>>( | ||
| float* address, float val, device::Sum<float> operation) { | ||
| atomicAdd(address, val); | ||
| } | ||
| #if __CUDA_ARCH__ >= 600 | ||
| template <> | ||
| __device__ __forceinline__ void atomicUpdate<double, device::Sum<double>>( | ||
| double* address, double val, device::Sum<double> operation) { | ||
| atomicAdd(address, val); | ||
| } | ||
| #endif | ||
|
|
||
| // Block Reduce | ||
| template <typename T, typename OperationT> | ||
| __device__ __forceinline__ T blockReduce(T val, T* shmem, OperationT operation) { | ||
|
|
||
| const int laneId = threadIdx.x % warpSize; | ||
| const int warpId = threadIdx.x / warpSize; | ||
|
|
||
| val = warpReduce(val, operation); | ||
| if (laneId == 0) { | ||
| shmem[warpId] = val; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In principle, you could already use the atomics here. (though IIRC there was a performance regression for FP32 on the MI250X in LDS, and only there with that)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing it out. However, given that exact LDS FP32 regression you mentioned on the MI250X, I think sticking to the two-pass shuffle (Warp 0 sweeping the shared memory) seems like a safer bet for now. But if you insist on implementing it that way, I can try in a different PR, but I will need to benchmark the differences, and I don't have access to LUMI as of now. |
||
| } | ||
| __syncthreads(); | ||
|
|
||
| if (currentWarp == 0) { | ||
| const auto lastWarpsNeeded = (warpCount + warpSize - 1) / warpSize; | ||
| const int numWarps = BlockSize / warpSize; | ||
| val = (threadIdx.x < numWarps) ? shmem[laneId] : operation.defaultValue; | ||
|
|
||
| auto value = operation.defaultValue; | ||
| auto lastAcc = operation.defaultValue; | ||
| if (warpId == 0) { | ||
| val = warpReduce(val, operation); | ||
| } | ||
|
|
||
| #pragma unroll 2 | ||
| for (int i = 0; i < lastWarpsNeeded; ++i) { | ||
| const auto id = threadInWarp + i * warpSize; | ||
| const auto valueNew = (id < warpCount) ? shmem[id] : operation.defaultValue; | ||
| return val; | ||
| } | ||
|
|
||
| value = operation(value, valueNew); | ||
| } | ||
| // Init Kernel to handle overrideResult safely across multiple blocks | ||
| template <typename T, typename OperationT> | ||
| __global__ void initKernel(T* result, OperationT operation) { | ||
| if (threadIdx.x == 0) { | ||
| *result = operation.defaultValue; | ||
| } | ||
| } | ||
|
|
||
| for (int offset = 1; offset < warpSize; offset *= 2) { | ||
| value = operation(value, shuffledown(value, offset)); | ||
| } | ||
| template <typename AccT, typename VecT, typename OperationT> | ||
| __launch_bounds__(BlockSize) void __global__ kernel_reduce( | ||
| AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) { | ||
|
|
||
| // Maximum block size 1024, warp size 32 so 1024/32 = 32 chosen | ||
| // For AMD, warp size 64, 1024/64 = 16, but 32 should work with a few idle memory addresses | ||
vikaskurapati marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| __shared__ AccT shmem[32]; | ||
|
|
||
| lastAcc = operation(lastAcc, value); | ||
| AccT threadAcc = operation.defaultValue; | ||
| size_t blockBaseIdx = blockIdx.x * (BlockSize * ItemsPerThread); | ||
| size_t threadBaseIdx = blockBaseIdx + threadIdx.x; | ||
|
|
||
| if (threadIdx.x == 0) { | ||
| if (overrideResult) { | ||
| ntstore(result, lastAcc); | ||
| } else { | ||
| ntstore(result, operation(ntload(result), lastAcc)); | ||
| } | ||
| #pragma unroll | ||
| for (int i = 0; i < ItemsPerThread; i++) { | ||
| size_t idx = threadBaseIdx + i * BlockSize; | ||
| if (idx < size) { | ||
| threadAcc = operation(threadAcc, static_cast<AccT>(ntload(&vector[idx]))); | ||
| } | ||
| } | ||
|
|
||
| AccT blockAcc = blockReduce<AccT, OperationT>(threadAcc, shmem, operation); | ||
|
|
||
| if (threadIdx.x == 0) { | ||
| (void)overrideResult; // to silence unused parameter warning for non-Add reductions | ||
| atomicUpdate(result, blockAcc, operation); | ||
| } | ||
| } | ||
|
|
||
| template <typename AccT, typename VecT> | ||
|
|
@@ -119,22 +174,36 @@ void Algorithms::reduceVector(AccT* result, | |
| void* streamPtr) { | ||
| auto* stream = reinterpret_cast<internals::DeviceStreamT>(streamPtr); | ||
|
|
||
| dim3 grid(1, 1, 1); | ||
| dim3 block(1024, 1, 1); | ||
| const size_t totalItems = BlockSize * ItemsPerThread; | ||
| const size_t numBlocks = (size + totalItems - 1) / totalItems; | ||
|
|
||
| if (overrideResult) { | ||
| switch (type) { | ||
| case ReductionType::Add: | ||
| initKernel<<<1, 1, 0, stream>>>(result, device::Sum<AccT>()); | ||
| break; | ||
| case ReductionType::Max: | ||
| initKernel<<<1, 1, 0, stream>>>(result, device::Max<AccT>()); | ||
| break; | ||
| case ReductionType::Min: | ||
| initKernel<<<1, 1, 0, stream>>>(result, device::Min<AccT>()); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| switch (type) { | ||
| case ReductionType::Add: { | ||
| kernel_reduce<<<grid, block, 0, stream>>>( | ||
| kernel_reduce<<<numBlocks, BlockSize, 0, stream>>>( | ||
| result, buffer, size, overrideResult, device::Sum<AccT>()); | ||
| break; | ||
| } | ||
| case ReductionType::Max: { | ||
| kernel_reduce<<<grid, block, 0, stream>>>( | ||
| kernel_reduce<<<numBlocks, BlockSize, 0, stream>>>( | ||
| result, buffer, size, overrideResult, device::Max<AccT>()); | ||
| break; | ||
| } | ||
| case ReductionType::Min: { | ||
| kernel_reduce<<<grid, block, 0, stream>>>( | ||
| kernel_reduce<<<numBlocks, BlockSize, 0, stream>>>( | ||
| result, buffer, size, overrideResult, device::Min<AccT>()); | ||
| break; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as a side note; you could use
cubin the CUDA case here. (there are sometimes, e.g. for min/max more advanced reduction functions starting with Ampere) That should come with CUDA included.For the AMD case, there should be an equivalent library (which might require explicit linking, however) — or you could use the OCKL functions (as long as they exist). See here for a forward as it's used: https://github.com/ROCm/clr/blob/b90c29358c694e66ea78cb1e3957edad09f35cbf/hipamd/include/hip/amd_detail/amd_warp_sync_functions.h#L57-L92 (you can almost just take a copy of that declaration code for the relevant functions—or try to use the HIP functions below ... EDIT: found the docs: https://rocm.docs.amd.com/projects/HIP/en/latest/how-to/hip_cpp_language_extensions.html )
... though I'm not sure if there's much speedup to gain in either case.
And it could also be put in some future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if I understand it correctly, and in https://rocm.docs.amd.com/projects/HIP/en/latest/how-to/hip_cpp_language_extensions.html -- I found the __reduce functions for warps only for int or unsigned int types. So, I added these if conditions, I am unsure how benificial they would be. If you suggest removing this, and keeping the manual ones, I will remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprisingly, these functions are not recognized by HIP in the CI, and they fail, while the NVIDIA tests pass silently. I am a bit confused, but for now, I removed this for HIP too. If there is a particular way to get this done, please let me know. :)