diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d3020c2756fff..e5bdff70b115b 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -48,6 +48,7 @@ use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; +use datafusion_common::instant::Instant; use datafusion_common::stats::Precision; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::transpose; @@ -418,7 +419,7 @@ impl RepartitionExecState { /// A utility that can be used to partition batches based on [`Partitioning`] pub struct BatchPartitioner { state: BatchPartitionerState, - timer: metrics::Time, + metrics: BatchPartitionerMetrics, } enum BatchPartitionerState { @@ -438,7 +439,43 @@ enum BatchPartitionerState { /// executions and runs. pub const REPARTITION_RANDOM_STATE: SeededRandomState = SeededRandomState::with_seed(0); +#[derive(Debug, Clone)] +struct BatchPartitionerMetrics { + /// End-to-end time spent repartitioning a batch. + total_time: metrics::Time, + /// Time spent routing row indices to output partitions. + route_time: metrics::Time, + /// Time spent materializing partitioned output batches from routed indices. + batch_build_time: metrics::Time, +} + +impl BatchPartitionerMetrics { + fn new(total_time: metrics::Time) -> Self { + Self { + total_time, + route_time: metrics::Time::new(), + batch_build_time: metrics::Time::new(), + } + } +} + impl BatchPartitioner { + fn new_hash_partitioner_with_metrics( + exprs: Vec>, + num_partitions: usize, + metrics: BatchPartitionerMetrics, + ) -> Self { + Self { + state: BatchPartitionerState::Hash { + exprs, + num_partitions, + hash_buffer: vec![], + indices: vec![vec![]; num_partitions], + }, + metrics, + } + } + /// Create a new [`BatchPartitioner`] for hash-based repartitioning. /// /// # Parameters @@ -452,15 +489,26 @@ impl BatchPartitioner { exprs: Vec>, num_partitions: usize, timer: metrics::Time, + ) -> Self { + Self::new_hash_partitioner_with_metrics( + exprs, + num_partitions, + BatchPartitionerMetrics::new(timer), + ) + } + + fn new_round_robin_partitioner_with_metrics( + num_partitions: usize, + metrics: BatchPartitionerMetrics, + input_partition: usize, + num_input_partitions: usize, ) -> Self { Self { - state: BatchPartitionerState::Hash { - exprs, + state: BatchPartitionerState::RoundRobin { num_partitions, - hash_buffer: vec![], - indices: vec![vec![]; num_partitions], + next_idx: (input_partition * num_partitions) / num_input_partitions, }, - timer, + metrics, } } @@ -481,13 +529,12 @@ impl BatchPartitioner { input_partition: usize, num_input_partitions: usize, ) -> Self { - Self { - state: BatchPartitionerState::RoundRobin { - num_partitions, - next_idx: (input_partition * num_partitions) / num_input_partitions, - }, - timer, - } + Self::new_round_robin_partitioner_with_metrics( + num_partitions, + BatchPartitionerMetrics::new(timer), + input_partition, + num_input_partitions, + ) } /// Create a new [`BatchPartitioner`] based on the provided [`Partitioning`] scheme. /// @@ -560,8 +607,11 @@ impl BatchPartitioner { num_partitions, next_idx, } => { + let _total_time_scope = self.metrics.total_time.timer(); let idx = *next_idx; *next_idx = (*next_idx + 1) % *num_partitions; + // Round-robin has no routing or batch rebuilding cost. + // The entire batch moves to a single output partition. Box::new(std::iter::once(Ok((idx, batch)))) } BatchPartitionerState::Hash { @@ -570,9 +620,8 @@ impl BatchPartitioner { hash_buffer, indices, } => { - // Tracking time required for distributing indexes across output partitions - let timer = self.timer.timer(); - + // Track the overall time spent repartitioning this input batch. + let _total_time_scope = self.metrics.total_time.timer(); let arrays = evaluate_expressions_to_arrays(exprs.as_slice(), &batch)?; @@ -585,44 +634,42 @@ impl BatchPartitioner { hash_buffer, )?; - indices.iter_mut().for_each(|v| v.clear()); + { + // Measure distributing row indices across output partitions. + let _route_time_scope = self.metrics.route_time.timer(); + indices.iter_mut().for_each(|v| v.clear()); - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize].push(index as u32); + for (index, hash) in hash_buffer.iter().enumerate() { + indices[(*hash % *partitions as u64) as usize] + .push(index as u32); + } } - // Finished building index-arrays for output partitions - timer.done(); - - // Borrowing partitioner timer to prevent moving `self` to closure - let partitioner_timer = &self.timer; - let mut partitioned_batches = vec![]; for (partition, p_indices) in indices.iter_mut().enumerate() { if !p_indices.is_empty() { let taken_indices = std::mem::take(p_indices); let indices_array: PrimitiveArray = taken_indices.into(); - - // Tracking time required for repartitioned batches construction - let _timer = partitioner_timer.timer(); - - // Produce batches based on indices - let columns = - take_arrays(batch.columns(), &indices_array, None)?; - - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices_array.len())); - let batch = RecordBatch::try_new_with_options( - batch.schema(), - columns, - &options, - ) - .unwrap(); + let batch = { + // Measure rebuilding output arrays from the routed indices. + let _batch_build_time_scope = + self.metrics.batch_build_time.timer(); + let columns = + take_arrays(batch.columns(), &indices_array, None)?; + + let mut options = RecordBatchOptions::new(); + options = + options.with_row_count(Some(indices_array.len())); + RecordBatch::try_new_with_options( + batch.schema(), + columns, + &options, + ) + .unwrap() + }; partitioned_batches.push(Ok((partition, batch))); - - // Return the taken vec let (_, buffer, _) = indices_array.into_parts(); let mut vec = buffer.into_inner().into_vec::().map_err(|e| { @@ -771,14 +818,18 @@ pub struct RepartitionExec { #[derive(Debug, Clone)] struct RepartitionMetrics { - /// Time in nanos to execute child operator and fetch batches + /// Time in nanos to execute child operator and fetch batches. fetch_time: metrics::Time, - /// Repartitioning elapsed time in nanos + /// Repartitioning elapsed time in nanos. repartition_time: metrics::Time, - /// Time in nanos for sending resulting batches to channels. - /// - /// One metric per output partition. - send_time: Vec, + /// Time in nanos spent routing row indices to output partitions. + route_time: metrics::Time, + /// Time in nanos spent constructing output batches. + batch_build_time: metrics::Time, + /// Time in nanos spent waiting for channel capacity. + channel_wait_time: Vec, + /// Time in nanos spent writing spilled batches. + spill_write_time: Vec, } impl RepartitionMetrics { @@ -794,22 +845,44 @@ impl RepartitionMetrics { // Time in nanos to perform repartitioning let repartition_time = MetricBuilder::new(metrics).subset_time("repartition_time", input_partition); + let route_time = + MetricBuilder::new(metrics).subset_time("route_time", input_partition); + let batch_build_time = + MetricBuilder::new(metrics).subset_time("batch_build_time", input_partition); - // Time in nanos for sending resulting batches to channels - let send_time = (0..num_output_partitions) + let channel_wait_time = (0..num_output_partitions) .map(|output_partition| { let label = metrics::Label::new("outputPartition", output_partition.to_string()); MetricBuilder::new(metrics) .with_label(label) - .subset_time("send_time", input_partition) + .subset_time("channel_wait_time", input_partition) + }) + .collect(); + let spill_write_time = (0..num_output_partitions) + .map(|output_partition| { + let label = + metrics::Label::new("outputPartition", output_partition.to_string()); + MetricBuilder::new(metrics) + .with_label(label) + .subset_time("spill_write_time", input_partition) }) .collect(); - Self { fetch_time, repartition_time, - send_time, + route_time, + batch_build_time, + channel_wait_time, + spill_write_time, + } + } + + fn batch_partitioner_metrics(&self) -> BatchPartitionerMetrics { + BatchPartitionerMetrics { + total_time: self.repartition_time.clone(), + route_time: self.route_time.clone(), + batch_build_time: self.batch_build_time.clone(), } } } @@ -1047,6 +1120,8 @@ impl ExecutionPlan for RepartitionExec { Arc::clone(&abort_helper), Arc::clone(&reservation), spill_stream, + MetricBuilder::new(&metrics) + .subset_time("spill_read_wait_time", partition), 1, // Each receiver handles one input partition BaselineMetrics::new(&metrics, partition), None, // subsequent merge sort already does batching https://github.com/apache/datafusion/blob/e4dcf0c85611ad0bd291f03a8e03fe56d773eb16/datafusion/physical-plan/src/sorts/merge.rs#L286 @@ -1086,6 +1161,8 @@ impl ExecutionPlan for RepartitionExec { abort_helper, reservation, spill_stream, + MetricBuilder::new(&metrics) + .subset_time("spill_read_wait_time", partition), num_input_partitions, BaselineMetrics::new(&metrics, partition), Some(context.session_config().batch_size()), @@ -1336,6 +1413,16 @@ impl RepartitionExec { } } + async fn send_with_metrics( + sender: &DistributionSender, + batch: MaybeBatch, + channel_wait_time: &metrics::Time, + ) -> bool { + // Track time spent waiting for the repartition output channel to accept the batch. + let _channel_wait_time_scope = channel_wait_time.timer(); + sender.send(batch).await.is_ok() + } + /// Pulls data from the specified input plan, feeding it to the /// output partitions based on the desired partitioning /// @@ -1350,16 +1437,16 @@ impl RepartitionExec { ) -> Result<()> { let mut partitioner = match &partitioning { Partitioning::Hash(exprs, num_partitions) => { - BatchPartitioner::new_hash_partitioner( + BatchPartitioner::new_hash_partitioner_with_metrics( exprs.clone(), *num_partitions, - metrics.repartition_time.clone(), + metrics.batch_partitioner_metrics(), ) } Partitioning::RoundRobinBatch(num_partitions) => { - BatchPartitioner::new_round_robin_partitioner( + BatchPartitioner::new_round_robin_partitioner_with_metrics( *num_partitions, - metrics.repartition_time.clone(), + metrics.batch_partitioner_metrics(), input_partition, num_input_partitions, ) @@ -1392,7 +1479,6 @@ impl RepartitionExec { let (partition, batch) = res?; let size = batch.get_array_memory_size(); - let timer = metrics.send_time[partition].timer(); // if there is still a receiver, send to it if let Some(channel) = output_channels.get_mut(&partition) { let (batch_to_send, is_memory_batch) = @@ -1404,13 +1490,24 @@ impl RepartitionExec { Err(_) => { // We're memory limited - spill to SpillPool // SpillPool handles file handle reuse and rotation - channel.spill_writer.push_batch(&batch)?; + { + // Track writing the spilled batch payload. + let _spill_write_time_scope = + metrics.spill_write_time[partition].timer(); + channel.spill_writer.push_batch(&batch)?; + } // Send marker indicating batch was spilled (RepartitionBatch::Spilled, false) } }; - if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { + if !Self::send_with_metrics( + &channel.sender, + Some(Ok(batch_to_send)), + &metrics.channel_wait_time[partition], + ) + .await + { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) // Only shrink memory if it was a memory batch if is_memory_batch { @@ -1419,7 +1516,6 @@ impl RepartitionExec { output_channels.remove(&partition); } } - timer.done(); } // If the input stream is endless, we may spin forever and @@ -1552,10 +1648,10 @@ enum StreamState { /// This struct converts a receiver to a stream. /// Receiver receives data on an SPSC channel. struct PerPartitionStream { - /// Schema wrapped by Arc + /// Schema wrapped by Arc. schema: SchemaRef, - /// channel containing the repartitioned batches + /// Channel containing the repartitioned batches. receiver: DistributionReceiver, /// Handle to ensure background tasks are killed when no longer needed. @@ -1564,10 +1660,16 @@ struct PerPartitionStream { /// Memory reservation. reservation: SharedMemoryReservation, - /// Infinite stream for reading from the spill pool + /// Infinite stream for reading from the spill pool. spill_stream: SendableRecordBatchStream, - /// Internal state indicating if we are reading from memory or spill stream + /// Time spent waiting for spilled batches to become readable. + spill_read_wait_time: metrics::Time, + + /// Start time for the current wait on the spill stream, if any. + spill_wait_start: Option, + + /// Internal state indicating if we are reading from memory or spill stream. state: StreamState, /// Number of input partitions that have not yet finished. @@ -1575,10 +1677,10 @@ struct PerPartitionStream { /// each sending None when complete. We must wait for all of them. remaining_partitions: usize, - /// Execution metrics + /// Execution metrics. baseline_metrics: BaselineMetrics, - /// None for sort preserving variant (merge sort already does coalescing) + /// None for sort preserving variant (merge sort already does coalescing). batch_coalescer: Option, } @@ -1590,6 +1692,7 @@ impl PerPartitionStream { drop_helper: Arc>>, reservation: SharedMemoryReservation, spill_stream: SendableRecordBatchStream, + spill_read_wait_time: metrics::Time, num_input_partitions: usize, baseline_metrics: BaselineMetrics, batch_size: Option, @@ -1602,6 +1705,8 @@ impl PerPartitionStream { _drop_helper: drop_helper, reservation, spill_stream, + spill_read_wait_time, + spill_wait_start: None, state: StreamState::ReadingMemory, remaining_partitions: num_input_partitions, baseline_metrics, @@ -1668,18 +1773,27 @@ impl PerPartitionStream { StreamState::ReadingSpilled => { // Poll spill stream for the spilled batch match self.spill_stream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(batch))) => { - self.state = StreamState::ReadingMemory; - return Poll::Ready(Some(Ok(batch))); - } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(Err(e))); - } - Poll::Ready(None) => { - // Spill stream ended, keep draining the memory channel - self.state = StreamState::ReadingMemory; + Poll::Ready(result) => { + // Record any accumulated wait time on the spill stream. + if let Some(start) = self.spill_wait_start.take() { + self.spill_read_wait_time.add_elapsed(start); + } + match result { + Some(Ok(batch)) => { + self.state = StreamState::ReadingMemory; + return Poll::Ready(Some(Ok(batch))); + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + None => { + // Spill stream ended, keep draining the memory channel + self.state = StreamState::ReadingMemory; + } + } } Poll::Pending => { + self.spill_wait_start.get_or_insert_with(Instant::now); // Spilled batch not ready yet, must wait // This preserves ordering by blocking until spill data arrives return Poll::Pending; @@ -2893,13 +3007,31 @@ mod test { // Verify metrics are available let metrics = exec.metrics().unwrap(); - // Just verify the metrics can be retrieved (spilling may or may not occur) - let spill_count = metrics.spill_count().unwrap_or(0); + // Verify spilling occurred and the repartition metrics were recorded. + let spill_count = metrics.spill_count().unwrap(); assert!(spill_count > 0); - let spilled_bytes = metrics.spilled_bytes().unwrap_or(0); + let spilled_bytes = metrics.spilled_bytes().unwrap(); assert!(spilled_bytes > 0); - let spilled_rows = metrics.spilled_rows().unwrap_or(0); + let spilled_rows = metrics.spilled_rows().unwrap(); assert!(spilled_rows > 0); + let fetch_time = metrics.sum_by_name("fetch_time").unwrap().as_usize(); + assert!(fetch_time > 0); + let repartition_time = + metrics.sum_by_name("repartition_time").unwrap().as_usize(); + assert!(repartition_time > 0); + let route_time = metrics.sum_by_name("route_time").unwrap().as_usize(); + assert!(route_time > 0); + let batch_build_time = + metrics.sum_by_name("batch_build_time").unwrap().as_usize(); + assert!(batch_build_time > 0); + assert!(repartition_time >= route_time + batch_build_time); + let channel_wait_time = + metrics.sum_by_name("channel_wait_time").unwrap().as_usize(); + assert!(channel_wait_time > 0); + let spill_write_time = + metrics.sum_by_name("spill_write_time").unwrap().as_usize(); + assert!(spill_write_time > 0); + assert!(metrics.sum_by_name("spill_read_wait_time").is_some()); Ok(()) } diff --git a/docs/source/user-guide/sql/explain.md b/docs/source/user-guide/sql/explain.md index 23101632625b1..d19a5135059f9 100644 --- a/docs/source/user-guide/sql/explain.md +++ b/docs/source/user-guide/sql/explain.md @@ -236,14 +236,13 @@ EXPLAIN ANALYZE SELECT SUM(x) FROM table GROUP BY b; +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ -| Plan with Metrics | CoalescePartitionsExec, metrics=[] | -| | ProjectionExec: expr=[SUM(table.x)@1 as SUM(x)], metrics=[] | -| | HashAggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[SUM(x)], metrics=[outputRows=2] | -| | CoalesceBatchesExec: target_batch_size=4096, metrics=[] | -| | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[sendTime=839560, fetchTime=122528525, repartitionTime=5327877] | -| | HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2] | -| | RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[fetchTime=5660489, repartitionTime=0, sendTime=8012] | -| | DataSourceExec: file_groups={1 group: [[/tmp/table.csv]]}, has_header=false, metrics=[] | +| Plan with Metrics | ProjectionExec: expr=[sum(table.x)@1 as sum(table.x)], metrics=[output_rows=2, elapsed_compute=7.30µs, output_bytes=64.0 B, output_batches=2, expr_0_eval_time=764ns] | +| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(table.x)], metrics=[output_rows=2, elapsed_compute=699.21µs, output_bytes=1088.0 B, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=17.60 K, aggregate_arguments_time=6.55µs, aggregation_time=6.97µs, emitting_time=8.47µs, time_calculating_group_ids=5.89µs] | +| | RepartitionExec: partitioning=Hash([b@0], 16), input_partitions=16, metrics=[output_rows=2, elapsed_compute=135.12µs, output_bytes=192.0 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batch_build_time=29.35µs, channel_wait_time=4.34µs, fetch_time=14.53ms, repartition_time=49.60µs, route_time=1.97µs, spill_read_wait_time=16ns, spill_write_time=256ns] | +| | AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(table.x)], metrics=[output_rows=2, elapsed_compute=242.71µs, output_bytes=544.0 B, output_batches=1, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=5.28 K, aggregate_arguments_time=31.31µs, aggregation_time=19.39µs, emitting_time=9.64µs, time_calculating_group_ids=10.77µs, reduction_factor=20% (2/10)] | +| | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1, metrics=[output_rows=10, elapsed_compute=50.25µs, output_bytes=64.0 KB, output_batches=1, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batch_build_time=1ns, channel_wait_time=3.43µs, fetch_time=792.21µs, repartition_time=292ns, route_time=1ns, spill_read_wait_time=16ns, spill_write_time=16ns] | +| | DataSourceExec: file_groups={1 group: [[tmp/datafusion-explain/table.csv]]}, projection=[x, b], file_type=csv, has_header=true, metrics=[output_rows=10, elapsed_compute=582.33µs, output_bytes=128.0 B, output_batches=1, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, time_elapsed_opening=140.33µs, time_elapsed_processing=721.33µs, time_elapsed_scanning_total=613.13µs, time_elapsed_scanning_until_data=588.17µs] | +| | | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ ```