diff --git a/native/Cargo.lock b/native/Cargo.lock index 4f7f5424b7..55933e767d 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -96,12 +96,56 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.60.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.60.2", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -1331,6 +1375,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] @@ -1339,8 +1384,22 @@ version = "4.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -1358,6 +1417,12 @@ dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "combine" version = "4.6.7" @@ -1953,6 +2018,7 @@ dependencies = [ "arrow", "async-trait", "bytes", + "clap", "crc32c", "crc32fast", "criterion", @@ -1965,6 +2031,7 @@ dependencies = [ "jni 0.21.1", "log", "lz4_flex 0.13.0", + "parquet", "simd-adler32", "snap", "tempfile", @@ -3632,6 +3699,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.13.0" @@ -4354,6 +4427,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" @@ -6414,6 +6493,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.23.0" diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml index 5cd7cd43ef..505879e319 100644 --- a/native/shuffle/Cargo.toml +++ b/native/shuffle/Cargo.toml @@ -32,6 +32,7 @@ publish = false arrow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +clap = { version = "4", features = ["derive"], optional = true } crc32c = "0.6.8" crc32fast = "1.3.2" datafusion = { workspace = true } @@ -43,6 +44,8 @@ itertools = "0.14.0" jni = "0.21" log = "0.4" lz4_flex = { version = "0.13.0", default-features = false, features = ["frame"] } +# parquet is only used by the shuffle_bench binary (shuffle-bench feature) +parquet = { workspace = true, optional = true } simd-adler32 = "0.3.9" snap = "1.1" tokio = { version = "1", features = ["rt-multi-thread"] } @@ -54,10 +57,18 @@ datafusion = { workspace = true, features = ["parquet_encryption", "sql"] } itertools = "0.14.0" tempfile = "3.26.0" +[features] +shuffle-bench = ["clap", "parquet"] + [lib] name = "datafusion_comet_shuffle" path = "src/lib.rs" +[[bin]] +name = "shuffle_bench" +path = "src/bin/shuffle_bench.rs" +required-features = ["shuffle-bench"] + [[bench]] name = "shuffle_writer" harness = false diff --git a/native/shuffle/README.md b/native/shuffle/README.md index 8fba6b0323..0f53604fa3 100644 --- a/native/shuffle/README.md +++ b/native/shuffle/README.md @@ -23,3 +23,44 @@ This crate provides the shuffle writer and reader implementation for Apache Data of the [Apache DataFusion Comet] subproject. [Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/ + +## Shuffle Benchmark Tool + +A standalone benchmark binary (`shuffle_bench`) is included for profiling shuffle write +performance outside of Spark. It streams input data directly from Parquet files. + +### Basic usage + +```sh +cargo run --release --features shuffle-bench --bin shuffle_bench -- \ + --input /data/tpch-sf100/lineitem/ \ + --partitions 200 \ + --codec lz4 \ + --hash-columns 0,3 +``` + +### Options + +| Option | Default | Description | +| --------------------- | -------------------------- | ------------------------------------------------------ | +| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files | +| `--partitions` | `200` | Number of output shuffle partitions | +| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | +| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | +| `--codec` | `lz4` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | +| `--zstd-level` | `1` | Zstd compression level (1–22) | +| `--batch-size` | `8192` | Batch size for reading Parquet data | +| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | +| `--write-buffer-size` | `1048576` | Write buffer size in bytes | +| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) | +| `--iterations` | `1` | Number of timed iterations | +| `--warmup` | `0` | Number of warmup iterations before timing | +| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files | + +### Profiling with flamegraph + +```sh +cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \ + --input /data/tpch-sf100/lineitem/ \ + --partitions 200 --codec lz4 +``` diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs new file mode 100644 index 0000000000..bb8c2a0380 --- /dev/null +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -0,0 +1,675 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Standalone shuffle benchmark tool for profiling Comet shuffle write +//! performance outside of Spark. Streams input directly from Parquet files. +//! +//! # Usage +//! +//! ```sh +//! cargo run --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 \ +//! --codec lz4 \ +//! --hash-columns 0,3 +//! ``` +//! +//! Profile with flamegraph: +//! ```sh +//! cargo flamegraph --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 --codec lz4 +//! ``` + +use arrow::datatypes::{DataType, SchemaRef}; +use clap::Parser; +use datafusion::execution::config::SessionConfig; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_comet_shuffle::{CometPartitioning, CompressionCodec, ShuffleWriterExec}; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Instant; + +#[derive(Parser, Debug)] +#[command( + name = "shuffle_bench", + about = "Standalone benchmark for Comet shuffle write performance" +)] +struct Args { + /// Path to input Parquet file or directory of Parquet files + #[arg(long)] + input: PathBuf, + + /// Batch size for reading Parquet data + #[arg(long, default_value_t = 8192)] + batch_size: usize, + + /// Number of output shuffle partitions + #[arg(long, default_value_t = 200)] + partitions: usize, + + /// Partitioning scheme: hash, single, round-robin + #[arg(long, default_value = "hash")] + partitioning: String, + + /// Column indices to hash on (comma-separated, e.g. "0,3") + #[arg(long, default_value = "0")] + hash_columns: String, + + /// Compression codec: none, lz4, zstd, snappy + #[arg(long, default_value = "lz4")] + codec: String, + + /// Zstd compression level (1-22) + #[arg(long, default_value_t = 1)] + zstd_level: i32, + + /// Memory limit in bytes (triggers spilling when exceeded) + #[arg(long)] + memory_limit: Option, + + /// Number of iterations to run + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Number of warmup iterations before timing + #[arg(long, default_value_t = 0)] + warmup: usize, + + /// Output directory for shuffle data/index files + #[arg(long, default_value = "/tmp/comet_shuffle_bench")] + output_dir: PathBuf, + + /// Write buffer size in bytes + #[arg(long, default_value_t = 1048576)] + write_buffer_size: usize, + + /// Limit rows processed per iteration (0 = no limit) + #[arg(long, default_value_t = 0)] + limit: usize, + + /// Number of concurrent shuffle tasks to simulate executor parallelism. + /// Each task reads the same input and writes to its own output files. + #[arg(long, default_value_t = 1)] + concurrent_tasks: usize, +} + +fn main() { + let args = Args::parse(); + + // Create output directory + fs::create_dir_all(&args.output_dir).expect("Failed to create output directory"); + let data_file = args.output_dir.join("data.out"); + let index_file = args.output_dir.join("index.out"); + + let (schema, total_rows) = read_parquet_metadata(&args.input, args.limit); + + let codec = parse_codec(&args.codec, args.zstd_level); + let hash_col_indices = parse_hash_columns(&args.hash_columns); + + println!("=== Shuffle Benchmark ==="); + println!("Input: {}", args.input.display()); + println!( + "Schema: {} columns ({})", + schema.fields().len(), + describe_schema(&schema) + ); + println!("Total rows: {}", format_number(total_rows as usize)); + println!("Batch size: {}", format_number(args.batch_size)); + println!("Partitioning: {}", args.partitioning); + println!("Partitions: {}", args.partitions); + println!("Codec: {:?}", codec); + println!("Hash columns: {:?}", hash_col_indices); + if let Some(mem_limit) = args.memory_limit { + println!("Memory limit: {}", format_bytes(mem_limit)); + } + if args.concurrent_tasks > 1 { + println!("Concurrent: {} tasks", args.concurrent_tasks); + } + println!( + "Iterations: {} (warmup: {})", + args.iterations, args.warmup + ); + println!(); + + let total_iters = args.warmup + args.iterations; + let mut write_times = Vec::with_capacity(args.iterations); + let mut data_file_sizes = Vec::with_capacity(args.iterations); + let mut last_metrics: Option = None; + let mut last_input_metrics: Option = None; + + for i in 0..total_iters { + let is_warmup = i < args.warmup; + let label = if is_warmup { + format!("warmup {}/{}", i + 1, args.warmup) + } else { + format!("iter {}/{}", i - args.warmup + 1, args.iterations) + }; + + let (write_elapsed, metrics, input_metrics) = if args.concurrent_tasks > 1 { + let elapsed = run_concurrent_shuffle_writes( + &args.input, + &schema, + &codec, + &hash_col_indices, + &args, + ); + (elapsed, None, None) + } else { + run_shuffle_write( + &args.input, + &schema, + &codec, + &hash_col_indices, + &args, + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + ) + }; + let data_size = fs::metadata(&data_file).map(|m| m.len()).unwrap_or(0); + + if !is_warmup { + write_times.push(write_elapsed); + data_file_sizes.push(data_size); + last_metrics = metrics; + last_input_metrics = input_metrics; + } + + print!(" [{label}] write: {:.3}s", write_elapsed); + if args.concurrent_tasks <= 1 { + print!(" output: {}", format_bytes(data_size as usize)); + } + + println!(); + } + + if args.iterations > 0 { + println!(); + println!("=== Results ==="); + + let avg_write = write_times.iter().sum::() / write_times.len() as f64; + let write_throughput_rows = (total_rows as f64 * args.concurrent_tasks as f64) / avg_write; + + println!("Write:"); + println!(" avg time: {:.3}s", avg_write); + if write_times.len() > 1 { + let min = write_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = write_times + .iter() + .cloned() + .fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {} rows/s (total across {} tasks)", + format_number(write_throughput_rows as usize), + args.concurrent_tasks + ); + if args.concurrent_tasks <= 1 { + let avg_data_size = data_file_sizes.iter().sum::() / data_file_sizes.len() as u64; + println!( + " output size: {}", + format_bytes(avg_data_size as usize) + ); + } + + if let Some(ref metrics) = last_input_metrics { + println!(); + println!("Input Metrics (last iteration):"); + print_input_metrics(metrics); + } + + if let Some(ref metrics) = last_metrics { + println!(); + println!("Shuffle Metrics (last iteration):"); + print_shuffle_metrics(metrics, avg_write); + } + } + + let _ = fs::remove_file(&data_file); + let _ = fs::remove_file(&index_file); +} + +fn print_shuffle_metrics(metrics: &MetricsSet, total_wall_time_secs: f64) { + let get_metric = |name: &str| -> Option { + metrics + .iter() + .find(|m| m.value().name() == name) + .map(|m| m.value().as_usize()) + }; + + let total_ns = (total_wall_time_secs * 1e9) as u64; + let fmt_time = |nanos: usize| -> String { + let secs = nanos as f64 / 1e9; + let pct = if total_ns > 0 { + (nanos as f64 / total_ns as f64) * 100.0 + } else { + 0.0 + }; + format!("{:.3}s ({:.1}%)", secs, pct) + }; + + if let Some(input_batches) = get_metric("input_batches") { + println!(" input batches: {}", format_number(input_batches)); + } + if let Some(nanos) = get_metric("repart_time") { + println!(" repart time: {}", fmt_time(nanos)); + } + if let Some(nanos) = get_metric("encode_time") { + println!(" encode time: {}", fmt_time(nanos)); + } + if let Some(nanos) = get_metric("write_time") { + println!(" write time: {}", fmt_time(nanos)); + } + + if let Some(spill_count) = get_metric("spill_count") { + if spill_count > 0 { + println!(" spill count: {}", format_number(spill_count)); + } + } + if let Some(spilled_bytes) = get_metric("spilled_bytes") { + if spilled_bytes > 0 { + println!(" spilled bytes: {}", format_bytes(spilled_bytes)); + } + } + if let Some(data_size) = get_metric("data_size") { + if data_size > 0 { + println!(" data size: {}", format_bytes(data_size)); + } + } +} + +fn print_input_metrics(metrics: &MetricsSet) { + let aggregated = metrics.aggregate_by_name(); + for m in aggregated.iter() { + let value = m.value(); + let name = value.name(); + let v = value.as_usize(); + if v == 0 { + continue; + } + // Format time metrics as seconds, everything else as a number + // Skip start/end timestamps — not useful in benchmark output + if matches!( + value, + MetricValue::StartTimestamp(_) | MetricValue::EndTimestamp(_) + ) { + continue; + } + let is_time = matches!( + value, + MetricValue::ElapsedCompute(_) | MetricValue::Time { .. } + ); + if is_time { + println!(" {name}: {:.3}s", v as f64 / 1e9); + } else if name.contains("bytes") || name.contains("size") { + println!(" {name}: {}", format_bytes(v)); + } else { + println!(" {name}: {}", format_number(v)); + } + } +} + +/// Read schema and total row count from Parquet metadata without loading any data. +fn read_parquet_metadata(path: &Path, limit: usize) -> (SchemaRef, u64) { + let paths = collect_parquet_paths(path); + let mut schema = None; + let mut total_rows = 0u64; + + for file_path in &paths { + let file = fs::File::open(file_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {}", file_path.display(), e)); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap_or_else(|e| { + panic!( + "Failed to read Parquet metadata from {}: {}", + file_path.display(), + e + ) + }); + if schema.is_none() { + schema = Some(Arc::clone(builder.schema())); + } + total_rows += builder.metadata().file_metadata().num_rows() as u64; + if limit > 0 && total_rows >= limit as u64 { + total_rows = total_rows.min(limit as u64); + break; + } + } + + (schema.expect("No parquet files found"), total_rows) +} + +fn collect_parquet_paths(path: &Path) -> Vec { + if path.is_dir() { + let mut files: Vec = fs::read_dir(path) + .unwrap_or_else(|e| panic!("Failed to read directory {}: {}", path.display(), e)) + .filter_map(|entry| { + let p = entry.ok()?.path(); + if p.extension().and_then(|e| e.to_str()) == Some("parquet") { + Some(p) + } else { + None + } + }) + .collect(); + files.sort(); + if files.is_empty() { + panic!("No .parquet files found in {}", path.display()); + } + files + } else { + vec![path.to_path_buf()] + } +} + +fn run_shuffle_write( + input_path: &Path, + schema: &SchemaRef, + codec: &CompressionCodec, + hash_col_indices: &[usize], + args: &Args, + data_file: &str, + index_file: &str, +) -> (f64, Option, Option) { + let partitioning = build_partitioning( + &args.partitioning, + args.partitions, + hash_col_indices, + schema, + ); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let start = Instant::now(); + let (shuffle_metrics, input_metrics) = execute_shuffle_write( + input_path.to_str().unwrap(), + codec.clone(), + partitioning, + args.batch_size, + args.memory_limit, + args.write_buffer_size, + args.limit, + data_file.to_string(), + index_file.to_string(), + ) + .await + .unwrap(); + ( + start.elapsed().as_secs_f64(), + Some(shuffle_metrics), + Some(input_metrics), + ) + }) +} + +/// Core async shuffle write logic shared by single and concurrent paths. +#[allow(clippy::too_many_arguments)] +async fn execute_shuffle_write( + input_path: &str, + codec: CompressionCodec, + partitioning: CometPartitioning, + batch_size: usize, + memory_limit: Option, + write_buffer_size: usize, + limit: usize, + data_file: String, + index_file: String, +) -> datafusion::common::Result<(MetricsSet, MetricsSet)> { + let config = SessionConfig::new().with_batch_size(batch_size); + let mut runtime_builder = RuntimeEnvBuilder::new(); + if let Some(mem_limit) = memory_limit { + runtime_builder = runtime_builder.with_memory_limit(mem_limit, 1.0); + } + let runtime_env = Arc::new(runtime_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + + let mut df = ctx + .read_parquet(input_path, ParquetReadOptions::default()) + .await + .expect("Failed to create Parquet scan"); + if limit > 0 { + df = df.limit(0, Some(limit)).unwrap(); + } + + let parquet_plan = df + .create_physical_plan() + .await + .expect("Failed to create physical plan"); + + let input: Arc = if parquet_plan + .properties() + .output_partitioning() + .partition_count() + > 1 + { + Arc::new(CoalescePartitionsExec::new(parquet_plan)) + } else { + parquet_plan + }; + + let exec = ShuffleWriterExec::try_new( + input, + partitioning, + codec, + data_file, + index_file, + false, + write_buffer_size, + ) + .expect("Failed to create ShuffleWriterExec"); + + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + collect(stream).await.unwrap(); + + // Collect metrics from the input plan (Parquet scan + optional coalesce) + let input_metrics = collect_input_metrics(&exec); + + Ok((exec.metrics().unwrap_or_default(), input_metrics)) +} + +/// Walk the plan tree and aggregate all metrics from input plans (everything below shuffle writer). +fn collect_input_metrics(exec: &ShuffleWriterExec) -> MetricsSet { + let mut all_metrics = MetricsSet::new(); + fn gather(plan: &dyn ExecutionPlan, out: &mut MetricsSet) { + if let Some(metrics) = plan.metrics() { + for m in metrics.iter() { + out.push(Arc::clone(m)); + } + } + for child in plan.children() { + gather(child.as_ref(), out); + } + } + for child in exec.children() { + gather(child.as_ref(), &mut all_metrics); + } + all_metrics +} + +/// Run N concurrent shuffle writes to simulate executor parallelism. +/// Returns wall-clock time for all tasks to complete. +fn run_concurrent_shuffle_writes( + input_path: &Path, + schema: &SchemaRef, + codec: &CompressionCodec, + hash_col_indices: &[usize], + args: &Args, +) -> f64 { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let start = Instant::now(); + + let mut handles = Vec::with_capacity(args.concurrent_tasks); + for task_id in 0..args.concurrent_tasks { + let task_dir = args.output_dir.join(format!("task_{task_id}")); + fs::create_dir_all(&task_dir).expect("Failed to create task output directory"); + let data_file = task_dir.join("data.out").to_str().unwrap().to_string(); + let index_file = task_dir.join("index.out").to_str().unwrap().to_string(); + + let input_str = input_path.to_str().unwrap().to_string(); + let codec = codec.clone(); + let partitioning = build_partitioning( + &args.partitioning, + args.partitions, + hash_col_indices, + schema, + ); + let batch_size = args.batch_size; + let memory_limit = args.memory_limit; + let write_buffer_size = args.write_buffer_size; + let limit = args.limit; + + handles.push(tokio::spawn(async move { + execute_shuffle_write( + &input_str, + codec, + partitioning, + batch_size, + memory_limit, + write_buffer_size, + limit, + data_file, + index_file, + ) + .await + .unwrap() + })); + } + + for handle in handles { + handle.await.expect("Task panicked"); + } + + for task_id in 0..args.concurrent_tasks { + let task_dir = args.output_dir.join(format!("task_{task_id}")); + let _ = fs::remove_dir_all(&task_dir); + } + + start.elapsed().as_secs_f64() + }) +} + +fn build_partitioning( + scheme: &str, + num_partitions: usize, + hash_col_indices: &[usize], + schema: &SchemaRef, +) -> CometPartitioning { + match scheme { + "single" => CometPartitioning::SinglePartition, + "round-robin" => CometPartitioning::RoundRobin(num_partitions, 0), + "hash" => { + let exprs: Vec> = hash_col_indices + .iter() + .map(|&idx| { + let field = schema.field(idx); + Arc::new(Column::new(field.name(), idx)) + as Arc + }) + .collect(); + CometPartitioning::Hash(exprs, num_partitions) + } + other => { + eprintln!("Unknown partitioning scheme: {other}. Using hash."); + build_partitioning("hash", num_partitions, hash_col_indices, schema) + } + } +} + +fn parse_codec(codec: &str, zstd_level: i32) -> CompressionCodec { + match codec.to_lowercase().as_str() { + "none" => CompressionCodec::None, + "lz4" => CompressionCodec::Lz4Frame, + "zstd" => CompressionCodec::Zstd(zstd_level), + "snappy" => CompressionCodec::Snappy, + other => { + eprintln!("Unknown codec: {other}. Using zstd."); + CompressionCodec::Zstd(zstd_level) + } + } +} + +fn parse_hash_columns(s: &str) -> Vec { + s.split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.trim().parse::().expect("Invalid column index")) + .collect() +} + +fn describe_schema(schema: &arrow::datatypes::Schema) -> String { + let mut counts = std::collections::HashMap::new(); + for field in schema.fields() { + let type_name = match field.data_type() { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => "int", + DataType::Float16 | DataType::Float32 | DataType::Float64 => "float", + DataType::Utf8 | DataType::LargeUtf8 => "string", + DataType::Boolean => "bool", + DataType::Date32 | DataType::Date64 => "date", + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => "decimal", + DataType::Timestamp(_, _) => "timestamp", + DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => "binary", + _ => "other", + }; + *counts.entry(type_name).or_insert(0) += 1; + } + let mut parts: Vec = counts + .into_iter() + .map(|(k, v)| format!("{}x{}", v, k)) + .collect(); + parts.sort(); + parts.join(", ") +} + +fn format_number(n: usize) -> String { + let s = n.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); + } + result.push(c); + } + result.chars().rev().collect() +} + +fn format_bytes(bytes: usize) -> String { + if bytes >= 1024 * 1024 * 1024 { + format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)) + } else if bytes >= 1024 * 1024 { + format!("{:.2} MiB", bytes as f64 / (1024.0 * 1024.0)) + } else if bytes >= 1024 { + format!("{:.2} KiB", bytes as f64 / 1024.0) + } else { + format!("{bytes} B") + } +} diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 7de9314f54..0113355e45 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -20,7 +20,7 @@ use crate::partitioners::partitioned_batch_iterator::{ PartitionedBatchIterator, PartitionedBatchesProducer, }; use crate::partitioners::ShufflePartitioner; -use crate::writers::{BufBatchWriter, PartitionWriter}; +use crate::writers::{BufBatchWriter, PartitionSpillRange, PartitionWriter, SpillInfo}; use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; @@ -125,6 +125,9 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, + /// Combined spill files. Each entry is a single file containing data from + /// multiple partitions, created during one spill event. + spill_infos: Vec, } impl MultiPartitionShuffleRepartitioner { @@ -190,6 +193,7 @@ impl MultiPartitionShuffleRepartitioner { reservation, tracing_enabled, write_buffer_size, + spill_infos: vec![], }) } @@ -502,20 +506,55 @@ impl MultiPartitionShuffleRepartitioner { with_trace("shuffle_spill", self.tracing_enabled, || { let num_output_partitions = self.partition_writers.len(); let mut partitioned_batches = self.partitioned_batches(); - let mut spilled_bytes = 0; + let mut spilled_bytes: usize = 0; + + // Create a single temporary file for this spill event + let temp_file = self + .runtime + .disk_manager + .create_tmp_file("shuffle writer spill")?; + let mut spill_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(temp_file.path()) + .map_err(|e| { + DataFusionError::Execution(format!("Error occurred while spilling {e}")) + })?; + + let mut partition_ranges = Vec::with_capacity(num_output_partitions); for partition_id in 0..num_output_partitions { let partition_writer = &mut self.partition_writers[partition_id]; let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill( + + let offset = spill_file.stream_position()?; + partition_writer.write_to( &mut iter, - &self.runtime, + &mut spill_file, &self.metrics, self.write_buffer_size, self.batch_size, )?; + let end_offset = spill_file.stream_position()?; + let actual_bytes = (end_offset - offset) as usize; + + if actual_bytes > 0 { + partition_ranges.push(Some(PartitionSpillRange { + offset, + length: actual_bytes as u64, + })); + spilled_bytes += actual_bytes; + } else { + partition_ranges.push(None); + } } + spill_file.flush()?; + + self.spill_infos + .push(SpillInfo::new(temp_file, partition_ranges)); + self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(spilled_bytes); @@ -524,8 +563,8 @@ impl MultiPartitionShuffleRepartitioner { } #[cfg(test)] - pub(crate) fn partition_writers(&self) -> &[PartitionWriter] { - &self.partition_writers + pub(crate) fn spill_count_files(&self) -> usize { + self.spill_infos.len() } } @@ -575,18 +614,24 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { let mut output_data = BufWriter::new(output_data); + // Pre-open all spill files once to avoid repeated File::open() calls. + // With N partitions and S spill files, this reduces open() calls from + // N*S to S. + let mut spill_handles: Vec<_> = self + .spill_infos + .iter() + .map(|info| info.open_for_read()) + .collect::>>()?; + #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { offsets[i] = output_data.stream_position()?; - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file - if let Some(spill_path) = self.partition_writers[i].path() { - // Use raw File handle (not BufReader) so that std::io::copy - // can use copy_file_range/sendfile for zero-copy on Linux. - let mut spill_file = File::open(spill_path)?; + // Copy spilled data for this partition from each spill file + // using pre-opened file handles. + for (spill_info, handle) in self.spill_infos.iter().zip(spill_handles.iter_mut()) { let mut write_timer = self.metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut output_data)?; + spill_info.copy_partition_with_handle(i, handle, &mut output_data)?; write_timer.stop(); } diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index e649aaac69..1bfe15c0c6 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -367,25 +367,20 @@ mod test { repartitioner.insert_batch(batch.clone()).await.unwrap(); - { - let partition_writers = repartitioner.partition_writers(); - assert_eq!(partition_writers.len(), 2); - - assert!(!partition_writers[0].has_spill_file()); - assert!(!partition_writers[1].has_spill_file()); - } + // before spill, no spill files should exist + assert_eq!(repartitioner.spill_count_files(), 0); repartitioner.spill().unwrap(); - // after spill, there should be spill files - { - let partition_writers = repartitioner.partition_writers(); - assert!(partition_writers[0].has_spill_file()); - assert!(partition_writers[1].has_spill_file()); - } + // after spill, exactly one combined spill file should exist (not one per partition) + assert_eq!(repartitioner.spill_count_files(), 1); // insert another batch after spilling repartitioner.insert_batch(batch.clone()).await.unwrap(); + + // spill again -- should create a second combined spill file + repartitioner.spill().unwrap(); + assert_eq!(repartitioner.spill_count_files(), 2); } fn create_runtime(memory_limit: usize) -> Arc { @@ -693,4 +688,257 @@ mod test { } total_rows } + + /// Verify that spilling an empty repartitioner produces no spill files. + #[tokio::test] + async fn spill_empty_buffers_produces_no_file() { + let batch = create_batch(100); + let memory_limit = 512 * 1024; + let num_partitions = 4; + let runtime_env = create_runtime(memory_limit); + let metrics_set = ExecutionPlanMetricsSet::new(); + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( + 0, + "/tmp/spill_empty_data.out".to_string(), + "/tmp/spill_empty_index.out".to_string(), + batch.schema(), + CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), + ShufflePartitionerMetrics::new(&metrics_set, 0), + runtime_env, + 1024, + CompressionCodec::Lz4Frame, + false, + 1024 * 1024, + ) + .unwrap(); + + // Spill with no data inserted -- should be a no-op + repartitioner.spill().unwrap(); + assert_eq!(repartitioner.spill_count_files(), 0); + } + + /// Verify that spilling with many partitions (some empty) still creates + /// exactly one spill file per spill event, and that shuffle_write succeeds. + #[test] + #[cfg_attr(miri, ignore)] + fn test_spill_with_sparse_partitions() { + // 100 rows across 50 partitions -- many partitions will be empty + shuffle_write_test(100, 5, 50, Some(10 * 1024)); + } + + /// Verify that the output of a spill-based shuffle contains the same total + /// row count and valid partition structure as a non-spill shuffle. + #[test] + #[cfg_attr(miri, ignore)] + fn test_spill_output_matches_non_spill() { + use std::fs; + + let batch_size = 1000; + let num_batches = 10; + let num_partitions = 8; + let total_rows = batch_size * num_batches; + + let batch = create_batch(batch_size); + let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); + + let parse_offsets = |index_data: &[u8]| -> Vec { + index_data + .chunks(8) + .map(|chunk| i64::from_le_bytes(chunk.try_into().unwrap())) + .collect() + }; + + let count_rows_in_partition = |data: &[u8], start: i64, end: i64| -> usize { + if start == end { + return 0; + } + read_all_ipc_blocks(&data[start as usize..end as usize]) + }; + + // Run 1: no spilling (large memory limit) + { + let partitions = std::slice::from_ref(&batches); + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(), + ))), + CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), + CompressionCodec::Zstd(1), + "/tmp/no_spill_data.out".to_string(), + "/tmp/no_spill_index.out".to_string(), + false, + 1024 * 1024, + ) + .unwrap(); + + let config = SessionConfig::new(); + let runtime_env = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(100 * 1024 * 1024, 1.0) + .build() + .unwrap(), + ); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + } + + // Run 2: with spilling (memory limit forces spilling during insert_batch) + { + let partitions = std::slice::from_ref(&batches); + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(), + ))), + CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), + CompressionCodec::Zstd(1), + "/tmp/spill_data.out".to_string(), + "/tmp/spill_index.out".to_string(), + false, + 1024 * 1024, + ) + .unwrap(); + + let config = SessionConfig::new(); + let runtime_env = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_limit(512 * 1024, 1.0) + .build() + .unwrap(), + ); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + } + + let no_spill_index = fs::read("/tmp/no_spill_index.out").unwrap(); + let spill_index = fs::read("/tmp/spill_index.out").unwrap(); + + assert_eq!( + no_spill_index.len(), + spill_index.len(), + "Index files should have the same number of entries" + ); + + let no_spill_offsets = parse_offsets(&no_spill_index); + let spill_offsets = parse_offsets(&spill_index); + + let no_spill_data = fs::read("/tmp/no_spill_data.out").unwrap(); + let spill_data = fs::read("/tmp/spill_data.out").unwrap(); + + // Verify row counts per partition match between spill and non-spill runs + let mut no_spill_total_rows = 0; + let mut spill_total_rows = 0; + for i in 0..num_partitions { + let ns_rows = count_rows_in_partition( + &no_spill_data, + no_spill_offsets[i], + no_spill_offsets[i + 1], + ); + let s_rows = + count_rows_in_partition(&spill_data, spill_offsets[i], spill_offsets[i + 1]); + assert_eq!( + ns_rows, s_rows, + "Partition {i} row count mismatch: no_spill={ns_rows}, spill={s_rows}" + ); + no_spill_total_rows += ns_rows; + spill_total_rows += s_rows; + } + + assert_eq!( + no_spill_total_rows, total_rows, + "Non-spill total row count mismatch" + ); + assert_eq!( + spill_total_rows, total_rows, + "Spill total row count mismatch" + ); + + // Cleanup + let _ = fs::remove_file("/tmp/no_spill_data.out"); + let _ = fs::remove_file("/tmp/no_spill_index.out"); + let _ = fs::remove_file("/tmp/spill_data.out"); + let _ = fs::remove_file("/tmp/spill_index.out"); + } + + /// Verify multiple spill events with subsequent insert_batch calls + /// produce correct output. + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_multiple_spills_then_write() { + let batch = create_batch(500); + let memory_limit = 512 * 1024; + let num_partitions = 4; + let runtime_env = create_runtime(memory_limit); + let metrics_set = ExecutionPlanMetricsSet::new(); + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( + 0, + "/tmp/multi_spill_data.out".to_string(), + "/tmp/multi_spill_index.out".to_string(), + batch.schema(), + CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), + ShufflePartitionerMetrics::new(&metrics_set, 0), + runtime_env, + 1024, + CompressionCodec::Lz4Frame, + false, + 1024 * 1024, + ) + .unwrap(); + + // Insert -> spill -> insert -> spill -> insert (3 inserts, 2 spills) + repartitioner.insert_batch(batch.clone()).await.unwrap(); + repartitioner.spill().unwrap(); + assert_eq!(repartitioner.spill_count_files(), 1); + + repartitioner.insert_batch(batch.clone()).await.unwrap(); + repartitioner.spill().unwrap(); + assert_eq!(repartitioner.spill_count_files(), 2); + + repartitioner.insert_batch(batch.clone()).await.unwrap(); + // Final shuffle_write merges 2 spill files + in-memory data + repartitioner.shuffle_write().unwrap(); + + // Verify output files exist and are non-empty + let data = std::fs::read("/tmp/multi_spill_data.out").unwrap(); + assert!(!data.is_empty(), "Output data file should be non-empty"); + + let index = std::fs::read("/tmp/multi_spill_index.out").unwrap(); + // Index should have (num_partitions + 1) * 8 bytes + assert_eq!( + index.len(), + (num_partitions + 1) * 8, + "Index file should have correct number of offset entries" + ); + + // Parse offsets and verify they are monotonically non-decreasing + let offsets: Vec = index + .chunks(8) + .map(|chunk| i64::from_le_bytes(chunk.try_into().unwrap())) + .collect(); + assert_eq!(offsets[0], 0, "First offset should be 0"); + for i in 1..offsets.len() { + assert!( + offsets[i] >= offsets[i - 1], + "Offsets must be monotonically non-decreasing: offset[{}]={} < offset[{}]={}", + i, + offsets[i], + i - 1, + offsets[i - 1] + ); + } + assert_eq!( + *offsets.last().unwrap() as usize, + data.len(), + "Last offset should equal data file size" + ); + + // Cleanup + let _ = std::fs::remove_file("/tmp/multi_spill_data.out"); + let _ = std::fs::remove_file("/tmp/multi_spill_index.out"); + } } diff --git a/native/shuffle/src/writers/mod.rs b/native/shuffle/src/writers/mod.rs index 75caf9f3a3..06f9622c12 100644 --- a/native/shuffle/src/writers/mod.rs +++ b/native/shuffle/src/writers/mod.rs @@ -23,4 +23,4 @@ mod spill; pub(crate) use buf_batch_writer::BufBatchWriter; pub(crate) use checksum::Checksum; pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter}; -pub(crate) use spill::PartitionWriter; +pub(crate) use spill::{PartitionSpillRange, PartitionWriter, SpillInfo}; diff --git a/native/shuffle/src/writers/spill.rs b/native/shuffle/src/writers/spill.rs index c16caddbf9..093b9ce6b7 100644 --- a/native/shuffle/src/writers/spill.rs +++ b/native/shuffle/src/writers/spill.rs @@ -21,22 +21,76 @@ use crate::partitioners::PartitionedBatchIterator; use crate::writers::buf_batch_writer::BufBatchWriter; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; -use datafusion::execution::runtime_env::RuntimeEnv; -use std::fs::{File, OpenOptions}; +use std::fs::File; +use std::io::{Read, Seek, SeekFrom, Write}; -/// A temporary disk file for spilling a partition's intermediate shuffle data. -struct SpillFile { - temp_file: RefCountedTempFile, - file: File, +/// The byte range of a single partition's data within a combined spill file. +#[derive(Debug, Clone)] +pub(crate) struct PartitionSpillRange { + pub offset: u64, + pub length: u64, } -/// Manages encoding and optional disk spilling for a single shuffle partition. +/// Represents a single spill file that contains data from multiple partitions. +/// Data is written sequentially ordered by partition ID. Each partition's byte +/// range is tracked in `partition_ranges` so it can be read back during merge. +pub(crate) struct SpillInfo { + /// The temporary file handle -- kept alive to prevent cleanup until we are done. + _temp_file: RefCountedTempFile, + /// Path to the spill file on disk. + path: std::path::PathBuf, + /// Byte range for each partition. None means the partition had no data in this spill. + pub partition_ranges: Vec>, +} + +impl SpillInfo { + pub(crate) fn new( + temp_file: RefCountedTempFile, + partition_ranges: Vec>, + ) -> Self { + let path = temp_file.path().to_path_buf(); + Self { + _temp_file: temp_file, + path, + partition_ranges, + } + } + + /// Copy the data for `partition_id` using a pre-opened file handle. + /// Avoids repeated File::open() calls when iterating over partitions. + /// Returns the number of bytes copied. + pub(crate) fn copy_partition_with_handle( + &self, + partition_id: usize, + spill_file: &mut File, + output: &mut impl Write, + ) -> datafusion::common::Result { + if let Some(ref range) = self.partition_ranges[partition_id] { + if range.length == 0 { + return Ok(0); + } + spill_file.seek(SeekFrom::Start(range.offset))?; + let mut limited = Read::take(spill_file, range.length); + let copied = std::io::copy(&mut limited, output)?; + Ok(copied) + } else { + Ok(0) + } + } + + /// Open the spill file for reading. The returned handle can be reused + /// across multiple copy_partition_with_handle() calls. + pub(crate) fn open_for_read(&self) -> datafusion::common::Result { + File::open(&self.path).map_err(|e| { + DataFusionError::Execution(format!("Failed to open spill file for reading: {e}")) + }) + } +} + +/// Manages encoding for a single shuffle partition. Does not own any spill file -- +/// spill files are managed at the repartitioner level as combined SpillInfo objects. pub(crate) struct PartitionWriter { - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option, - /// Writer that performs encoding and compression + /// Writer that performs encoding and compression. shuffle_block_writer: ShuffleBlockWriter, } @@ -45,51 +99,25 @@ impl PartitionWriter { shuffle_block_writer: ShuffleBlockWriter, ) -> datafusion::common::Result { Ok(Self { - spill_file: None, shuffle_block_writer, }) } - fn ensure_spill_file_created( - &mut self, - runtime: &RuntimeEnv, - ) -> datafusion::common::Result<()> { - if self.spill_file.is_none() { - // Spill file is not yet created, create it - let spill_file = runtime - .disk_manager - .create_tmp_file("shuffle writer spill")?; - let spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(spill_file.path()) - .map_err(|e| { - DataFusionError::Execution(format!("Error occurred while spilling {e}")) - })?; - self.spill_file = Some(SpillFile { - temp_file: spill_file, - file: spill_data, - }); - } - Ok(()) - } - - pub(crate) fn spill( + /// Encode and write a partition's batches to the provided writer. + /// Returns the number of bytes written. + pub(crate) fn write_to( &mut self, iter: &mut PartitionedBatchIterator, - runtime: &RuntimeEnv, + writer: &mut W, metrics: &ShufflePartitionerMetrics, write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result { if let Some(batch) = iter.next() { - self.ensure_spill_file_created(runtime)?; - let total_bytes_written = { let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, - &mut self.spill_file.as_mut().unwrap().file, + writer, write_buffer_size, batch_size, ); @@ -106,21 +134,9 @@ impl PartitionWriter { buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; bytes_written }; - Ok(total_bytes_written) } else { Ok(0) } } - - pub(crate) fn path(&self) -> Option<&std::path::Path> { - self.spill_file - .as_ref() - .map(|spill_file| spill_file.temp_file.path()) - } - - #[cfg(test)] - pub(crate) fn has_spill_file(&self) -> bool { - self.spill_file.is_some() - } }