Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -647,10 +647,14 @@ object CometConf extends ShimCometConf {
.category(CATEGORY_TUNING)
.doc(
"The type of memory pool to be used for Comet native execution when running Spark in " +
"off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " +
"off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " +
"`fair_unified_task_shared`. The `fair_unified_task_shared` pool is shared across " +
"all native plans within the same Spark task, ensuring that the total memory " +
"consumption does not exceed the per-task limit even when multiple native plans " +
"(e.g. a shuffle writer and its child plan) execute concurrently. " +
s"$TUNING_GUIDE.")
.stringConf
.createWithDefault("fair_unified")
.createWithDefault("fair_unified_task_shared")

val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
"spark.comet.exec.onHeap.memoryPool")
Expand Down
20 changes: 17 additions & 3 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,29 @@ For more details about Spark off-heap memory mode, please refer to [Spark docume

Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.

When Comet executes a shuffle, it creates two separate native plans within the same Spark task: the child plan
(e.g. scan, filter, join) and the shuffle writer plan. These two plans run concurrently in a pipelined fashion —
the child plan produces batches that the shuffle writer consumes and repartitions. This means both plans hold
memory reservations at the same time, which is important to understand when choosing a memory pool type.

The valid pool types are:

- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set)
- `fair_unified`
- `greedy_unified`

The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory
The `fair_unified` pool prevents operators from using more than an even fraction of the available memory
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
when there is sufficient memory in order to leave enough memory for other operators.
when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool
type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit when two
plans are running concurrently (e.g. during shuffle).

The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the
same Spark task. Because the child plan and shuffle writer each get their own pool with `fair_unified`, both can
independently allocate up to the full per-task memory limit, effectively allowing 2x the intended memory to be
consumed. The `fair_unified_task_shared` pool avoids this by sharing a single pool instance, ensuring that the
combined memory usage of both plans stays within the per-task limit.

The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
need to spill or have a single spillable operator.
Expand Down
4 changes: 1 addition & 3 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,29 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize {
map.remove(&thread_id);
return 0;
}
return pools.values().map(|p| p.reserved()).sum::<usize>();
let mut seen = std::collections::HashSet::new();
return pools
.values()
.filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize))
.map(|p| p.reserved())
.sum::<usize>();
}
0
}

fn total_reserved_for_thread(thread_id: u64) -> usize {
let map = get_thread_memory_pools().lock();
map.get(&thread_id)
.map(|pools| pools.values().map(|p| p.reserved()).sum::<usize>())
.map(|pools| {
// Deduplicate pools that share the same underlying allocation
// (e.g. task-shared pools registered by multiple execution contexts)
let mut seen = std::collections::HashSet::new();
pools
.values()
.filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize))
.map(|p| p.reserved())
.sum::<usize>()
})
.unwrap_or(0)
}

Expand Down
8 changes: 7 additions & 1 deletion native/core/src/execution/memory_pools/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::errors::{CometError, CometResult};
pub(crate) enum MemoryPoolType {
GreedyUnified,
FairUnified,
FairUnifiedTaskShared,
Greedy,
FairSpill,
GreedyTaskShared,
Expand All @@ -34,7 +35,9 @@ impl MemoryPoolType {
pub(crate) fn is_task_shared(&self) -> bool {
matches!(
self,
MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared
MemoryPoolType::GreedyTaskShared
| MemoryPoolType::FairSpillTaskShared
| MemoryPoolType::FairUnifiedTaskShared
)
}
}
Expand Down Expand Up @@ -63,6 +66,9 @@ pub(crate) fn parse_memory_pool_config(
let memory_pool_config = if off_heap_mode {
match memory_pool_type.as_str() {
"fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
"fair_unified_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::FairUnifiedTaskShared, pool_size)
}
"greedy_unified" => {
// the `unified` memory pool interacts with Spark's memory pool to allocate
// memory therefore does not need a size to be explicitly set. The pool size
Expand Down
16 changes: 16 additions & 0 deletions native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ pub(crate) fn create_memory_pool(
});
Arc::clone(memory_pool)
}
MemoryPoolType::FairUnifiedTaskShared => {
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
let per_task_memory_pool =
memory_pool_map.entry(task_attempt_id).or_insert_with(|| {
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
CometFairMemoryPool::new(
Arc::clone(&comet_task_memory_manager),
memory_pool_config.pool_size,
),
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
));
PerTaskMemoryPool::new(pool)
});
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => {
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
let per_task_memory_pool =
Expand Down
Loading