From a42941703c07da2225dfa8d17618cbcc19d771c3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 10 Apr 2026 10:24:09 -0600 Subject: [PATCH 1/6] feat: add fair_unified_task_shared memory pool to fix 2x memory allocation When Comet executes a shuffle, it creates two separate native plans (the child plan and the shuffle writer plan) that run concurrently in a pipelined fashion. Previously, each plan got its own memory pool at the full per-task limit, effectively allowing 2x the intended memory to be consumed. The new `fair_unified_task_shared` pool type shares a single CometFairMemoryPool across all native plans within the same Spark task. This ensures the total memory stays within the per-task limit while dynamically distributing memory among operators based on how many register as memory consumers (e.g. if the child plan is a simple scan+filter, the shuffle writer gets 100% of the pool). This is now the default for off-heap mode. Closes #3921 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../main/scala/org/apache/comet/CometConf.scala | 8 ++++++-- docs/source/user-guide/latest/tuning.md | 13 ++++++++++--- native/core/src/execution/memory_pools/config.rs | 8 +++++++- native/core/src/execution/memory_pools/mod.rs | 16 ++++++++++++++++ 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..223b6f52eb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -647,10 +647,14 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TUNING) .doc( "The type of memory pool to be used for Comet native execution when running Spark in " + - "off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " + + "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " + + "`fair_unified_task_shared`. The `fair_unified_task_shared` pool is shared across " + + "all native plans within the same Spark task, ensuring that the total memory " + + "consumption does not exceed the per-task limit even when multiple native plans " + + "(e.g. a shuffle writer and its child plan) execute concurrently. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified") + .createWithDefault("fair_unified_task_shared") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..54e34d5b83 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -58,13 +58,20 @@ Comet implements multiple memory pool implementations. The type of pool can be s The valid pool types are: -- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified` - `greedy_unified` -The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory +The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the +same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple +native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own +pool at the full per-task limit, effectively doubling the memory that can be consumed. + +The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even -when there is sufficient memory in order to leave enough memory for other operators. +when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool +type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index d30126a99a..4f2c03ecbb 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -21,6 +21,7 @@ use crate::errors::{CometError, CometResult}; pub(crate) enum MemoryPoolType { GreedyUnified, FairUnified, + FairUnifiedTaskShared, Greedy, FairSpill, GreedyTaskShared, @@ -34,7 +35,9 @@ impl MemoryPoolType { pub(crate) fn is_task_shared(&self) -> bool { matches!( self, - MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared + MemoryPoolType::GreedyTaskShared + | MemoryPoolType::FairSpillTaskShared + | MemoryPoolType::FairUnifiedTaskShared ) } } @@ -63,6 +66,9 @@ pub(crate) fn parse_memory_pool_config( let memory_pool_config = if off_heap_mode { match memory_pool_type.as_str() { "fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size), + "fair_unified_task_shared" => { + MemoryPoolConfig::new(MemoryPoolType::FairUnifiedTaskShared, pool_size) + } "greedy_unified" => { // the `unified` memory pool interacts with Spark's memory pool to allocate // memory therefore does not need a size to be explicitly set. The pool size diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index d44290d058..cfb708b58f 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -87,6 +87,22 @@ pub(crate) fn create_memory_pool( }); Arc::clone(memory_pool) } + MemoryPoolType::FairUnifiedTaskShared => { + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometFairMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + memory_pool_config.pool_size, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) + } MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => { let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); let per_task_memory_pool = From e213af6238daeeae1546014e9fd5513e2980a62c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 09:18:01 -0600 Subject: [PATCH 2/6] fix: deduplicate task-shared pools in tracing memory accounting When using fair_unified_task_shared, multiple execution contexts on the same thread share a single Arc. The tracing code was summing pool.reserved() for each registered context, double-counting the shared pool and reporting 2x the actual memory reservation. Deduplicate pools by Arc data pointer before summing so each underlying pool is only counted once. --- native/core/src/execution/jni_api.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 93f75bae96..eee80ff2d9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -141,7 +141,12 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { map.remove(&thread_id); return 0; } - return pools.values().map(|p| p.reserved()).sum::(); + let mut seen = std::collections::HashSet::new(); + return pools + .values() + .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) + .map(|p| p.reserved()) + .sum::(); } 0 } @@ -149,7 +154,16 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { fn total_reserved_for_thread(thread_id: u64) -> usize { let map = get_thread_memory_pools().lock(); map.get(&thread_id) - .map(|pools| pools.values().map(|p| p.reserved()).sum::()) + .map(|pools| { + // Deduplicate pools that share the same underlying allocation + // (e.g. task-shared pools registered by multiple execution contexts) + let mut seen = std::collections::HashSet::new(); + pools + .values() + .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) + .map(|p| p.reserved()) + .sum::() + }) .unwrap_or(0) } From fed742817e8bd031ff3559158b0dcbcd536d85ad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 10:04:54 -0600 Subject: [PATCH 3/6] feat: change default memory pool back to fair_unified Make fair_unified_task_shared opt-in rather than the default to simplify review. Update docs to reflect the new default. --- .../main/scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/latest/tuning.md | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 223b6f52eb..b63a563ec7 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -654,7 +654,7 @@ object CometConf extends ShimCometConf { "(e.g. a shuffle writer and its child plan) execute concurrently. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified_task_shared") + .createWithDefault("fair_unified") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 54e34d5b83..517bfe3bb9 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -58,21 +58,21 @@ Comet implements multiple memory pool implementations. The type of pool can be s The valid pool types are: -- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified` +- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified_task_shared` - `greedy_unified` -The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the -same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple -native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own -pool at the full per-task limit, effectively doubling the memory that can be consumed. - The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit. +The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the +same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple +native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own +pool at the full per-task limit, effectively doubling the memory that can be consumed. + The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. From f8b15e544b36b39da137b887b35f35375443387a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 10:06:44 -0600 Subject: [PATCH 4/6] docs: explain dual native plan architecture in tuning guide Add context about how Comet creates two concurrent native plans per Spark task during shuffle and why this matters for pool selection. --- docs/source/user-guide/latest/tuning.md | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 517bfe3bb9..00fdf0203a 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -56,6 +56,11 @@ For more details about Spark off-heap memory mode, please refer to [Spark docume Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`. +When Comet executes a shuffle, it creates two separate native plans within the same Spark task: the child plan +(e.g. scan, filter, join) and the shuffle writer plan. These two plans run concurrently in a pipelined fashion — +the child plan produces batches that the shuffle writer consumes and repartitions. This means both plans hold +memory reservations at the same time, which is important to understand when choosing a memory pool type. + The valid pool types are: - `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) @@ -66,12 +71,14 @@ The `fair_unified` pool prevents operators from using more than an even fraction (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool -type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit. +type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit when two +plans are running concurrently (e.g. during shuffle). The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the -same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple -native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own -pool at the full per-task limit, effectively doubling the memory that can be consumed. +same Spark task. Because the child plan and shuffle writer each get their own pool with `fair_unified`, both can +independently allocate up to the full per-task memory limit, effectively allowing 2x the intended memory to be +consumed. The `fair_unified_task_shared` pool avoids this by sharing a single pool instance, ensuring that the +combined memory usage of both plans stays within the per-task limit. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. From 394f0387839f0ced4874dff2b56392ec0b787bf4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 22:04:48 -0600 Subject: [PATCH 5/6] change default back --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/latest/tuning.md | 4 ++-- native/Cargo.lock | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b63a563ec7..223b6f52eb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -654,7 +654,7 @@ object CometConf extends ShimCometConf { "(e.g. a shuffle writer and its child plan) execute concurrently. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified") + .createWithDefault("fair_unified_task_shared") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 00fdf0203a..b836861e7b 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -63,8 +63,8 @@ memory reservations at the same time, which is important to understand when choo The valid pool types are: -- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified_task_shared` +- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified` - `greedy_unified` The `fair_unified` pool prevents operators from using more than an even fraction of the available memory diff --git a/native/Cargo.lock b/native/Cargo.lock index 480f7ad06d..b5c7f2b0c7 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -2070,9 +2070,7 @@ dependencies = [ "num", "rand 0.10.0", "regex", - "serde", "serde_json", - "thiserror 2.0.18", "tokio", "twox-hash", ] @@ -5885,7 +5883,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.13.1", "itoa", "memchr", "serde", From a5ed8d3a8abe871193bb75665a632bc809896e5f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 22:07:56 -0600 Subject: [PATCH 6/6] prettier --- docs/source/user-guide/latest/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index b836861e7b..55005e94f6 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -64,7 +64,7 @@ memory reservations at the same time, which is important to understand when choo The valid pool types are: - `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified` +- `fair_unified` - `greedy_unified` The `fair_unified` pool prevents operators from using more than an even fraction of the available memory