diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..223b6f52eb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -647,10 +647,14 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TUNING) .doc( "The type of memory pool to be used for Comet native execution when running Spark in " + - "off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " + + "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " + + "`fair_unified_task_shared`. The `fair_unified_task_shared` pool is shared across " + + "all native plans within the same Spark task, ensuring that the total memory " + + "consumption does not exceed the per-task limit even when multiple native plans " + + "(e.g. a shuffle writer and its child plan) execute concurrently. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified") + .createWithDefault("fair_unified_task_shared") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..55005e94f6 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -56,15 +56,29 @@ For more details about Spark off-heap memory mode, please refer to [Spark docume Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`. +When Comet executes a shuffle, it creates two separate native plans within the same Spark task: the child plan +(e.g. scan, filter, join) and the shuffle writer plan. These two plans run concurrently in a pipelined fashion — +the child plan produces batches that the shuffle writer consumes and repartitions. This means both plans hold +memory reservations at the same time, which is important to understand when choosing a memory pool type. + The valid pool types are: -- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified` - `greedy_unified` -The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory +The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even -when there is sufficient memory in order to leave enough memory for other operators. +when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool +type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit when two +plans are running concurrently (e.g. during shuffle). + +The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the +same Spark task. Because the child plan and shuffle writer each get their own pool with `fair_unified`, both can +independently allocate up to the full per-task memory limit, effectively allowing 2x the intended memory to be +consumed. The `fair_unified_task_shared` pool avoids this by sharing a single pool instance, ensuring that the +combined memory usage of both plans stays within the per-task limit. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. diff --git a/native/Cargo.lock b/native/Cargo.lock index 480f7ad06d..b5c7f2b0c7 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -2070,9 +2070,7 @@ dependencies = [ "num", "rand 0.10.0", "regex", - "serde", "serde_json", - "thiserror 2.0.18", "tokio", "twox-hash", ] @@ -5885,7 +5883,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.13.1", "itoa", "memchr", "serde", diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 93f75bae96..eee80ff2d9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -141,7 +141,12 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { map.remove(&thread_id); return 0; } - return pools.values().map(|p| p.reserved()).sum::(); + let mut seen = std::collections::HashSet::new(); + return pools + .values() + .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) + .map(|p| p.reserved()) + .sum::(); } 0 } @@ -149,7 +154,16 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { fn total_reserved_for_thread(thread_id: u64) -> usize { let map = get_thread_memory_pools().lock(); map.get(&thread_id) - .map(|pools| pools.values().map(|p| p.reserved()).sum::()) + .map(|pools| { + // Deduplicate pools that share the same underlying allocation + // (e.g. task-shared pools registered by multiple execution contexts) + let mut seen = std::collections::HashSet::new(); + pools + .values() + .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) + .map(|p| p.reserved()) + .sum::() + }) .unwrap_or(0) } diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index d30126a99a..4f2c03ecbb 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -21,6 +21,7 @@ use crate::errors::{CometError, CometResult}; pub(crate) enum MemoryPoolType { GreedyUnified, FairUnified, + FairUnifiedTaskShared, Greedy, FairSpill, GreedyTaskShared, @@ -34,7 +35,9 @@ impl MemoryPoolType { pub(crate) fn is_task_shared(&self) -> bool { matches!( self, - MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared + MemoryPoolType::GreedyTaskShared + | MemoryPoolType::FairSpillTaskShared + | MemoryPoolType::FairUnifiedTaskShared ) } } @@ -63,6 +66,9 @@ pub(crate) fn parse_memory_pool_config( let memory_pool_config = if off_heap_mode { match memory_pool_type.as_str() { "fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size), + "fair_unified_task_shared" => { + MemoryPoolConfig::new(MemoryPoolType::FairUnifiedTaskShared, pool_size) + } "greedy_unified" => { // the `unified` memory pool interacts with Spark's memory pool to allocate // memory therefore does not need a size to be explicitly set. The pool size diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index d44290d058..cfb708b58f 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -87,6 +87,22 @@ pub(crate) fn create_memory_pool( }); Arc::clone(memory_pool) } + MemoryPoolType::FairUnifiedTaskShared => { + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometFairMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + memory_pool_config.pool_size, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) + } MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => { let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); let per_task_memory_pool =