Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "datafusion-datasource"
description = "datafusion-datasource"
readme = "README.md"
authors.workspace = true
edition.workspace = true
edition = "2024"
homepage.workspace = true
license.workspace = true
repository.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion_datasource::{generate_test_files, verify_sort_integrity};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};

pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
let file_schema = Arc::new(Schema::new(vec![Field::new(
Expand Down
28 changes: 15 additions & 13 deletions datafusion/datasource/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use arrow::error::ArrowError;
use bytes::Buf;
use bytes::Bytes;
use datafusion_common::Result;
use futures::stream::BoxStream;
use futures::StreamExt as _;
use futures::{ready, Stream};
use futures::stream::BoxStream;
use futures::{Stream, ready};
use std::collections::VecDeque;
use std::fmt;
use std::task::Poll;
Expand Down Expand Up @@ -175,17 +175,19 @@ pub fn deserialize_stream<'a>(
mut input: impl Stream<Item = Result<Bytes>> + Unpin + Send + 'a,
mut deserializer: impl BatchDeserializer<Bytes> + 'a,
) -> BoxStream<'a, Result<RecordBatch, ArrowError>> {
futures::stream::poll_fn(move |cx| loop {
match ready!(input.poll_next_unpin(cx)).transpose()? {
Some(b) => _ = deserializer.digest(b),
None => deserializer.finish(),
};

return match deserializer.next()? {
DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
DeserializerOutput::InputExhausted => Poll::Ready(None),
DeserializerOutput::RequiresMoreData => continue,
};
futures::stream::poll_fn(move |cx| {
loop {
match ready!(input.poll_next_unpin(cx)).transpose()? {
Some(b) => _ = deserializer.digest(b),
None => deserializer.finish(),
};

return match deserializer.next()? {
DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
DeserializerOutput::InputExhausted => Poll::Ready(None),
DeserializerOutput::RequiresMoreData => continue,
};
}
})
.boxed()
}
2 changes: 1 addition & 1 deletion datafusion/datasource/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ mod tests {
use super::*;

use datafusion_physical_plan::{DefaultDisplay, VerboseDisplay};
use object_store::{path::Path, ObjectMeta};
use object_store::{ObjectMeta, path::Path};

use crate::PartitionedFile;
use chrono::Utc;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, Result};
use datafusion_common::{Result, not_impl_err};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

use object_store::ObjectStore;

Expand Down
12 changes: 6 additions & 6 deletions datafusion/datasource/src/file_compression_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::str::FromStr;

use datafusion_common::error::{DataFusionError, Result};

use datafusion_common::parsers::CompressionTypeVariant::{self, *};
use datafusion_common::GetExt;
use datafusion_common::parsers::CompressionTypeVariant::{self, *};

#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
Expand All @@ -39,10 +39,10 @@ use bytes::Bytes;
use bzip2::read::MultiBzDecoder;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;
use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use futures::stream::BoxStream;
#[cfg(feature = "compression")]
use liblzma::read::XzDecoder;
use object_store::buffered::BufWriter;
Expand Down Expand Up @@ -148,7 +148,7 @@ impl FileCompressionType {
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
));
}
UNCOMPRESSED => s.boxed(),
})
Expand All @@ -173,7 +173,7 @@ impl FileCompressionType {
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
));
}
UNCOMPRESSED => Box::new(w),
})
Expand Down Expand Up @@ -210,7 +210,7 @@ impl FileCompressionType {
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
));
}
UNCOMPRESSED => s.boxed(),
})
Expand All @@ -237,7 +237,7 @@ impl FileCompressionType {
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
));
}
UNCOMPRESSED => Box::new(r),
})
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::file_sink_config::FileSinkConfig;

use arrow::datatypes::SchemaRef;
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics};
use datafusion_common::{GetExt, Result, Statistics, internal_err, not_impl_err};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_session::Session;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::{FileRange, PartitionedFile};
use datafusion_common::Statistics;
use itertools::Itertools;
use std::cmp::{min, Ordering};
use std::cmp::{Ordering, min};
use std::collections::BinaryHeap;
use std::iter::repeat_with;
use std::mem;
Expand Down
74 changes: 44 additions & 30 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,32 @@

use crate::file_groups::FileGroup;
use crate::{
display::FileGroupsDisplay, file::FileSource,
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
file_compression_type::FileCompressionType, file_stream::FileStream,
source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
source::DataSource, statistics::MinMaxStatistics,
};
use arrow::datatypes::FieldRef;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
internal_datafusion_err, internal_err, Constraints, Result, ScalarValue, Statistics,
Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
};
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
DisplayAs, DisplayFormatType,
display::{ProjectSchemaDisplay, display_orderings},
filter_pushdown::FilterPushdownPropagation,
metrics::ExecutionPlanMetricsSet,
DisplayAs, DisplayFormatType,
};
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};

Expand Down Expand Up @@ -303,7 +303,9 @@ impl FileScanConfigBuilder {
match self.clone().with_projection_indices(indices) {
Ok(builder) => builder,
Err(e) => {
warn!("Failed to push down projection in FileScanConfigBuilder::with_projection: {e}");
warn!(
"Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
);
self
}
}
Expand Down Expand Up @@ -643,16 +645,16 @@ impl DataSource for FileScanConfig {
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
// Get statistics for a specific partition
if let Some(file_group) = self.file_groups.get(partition) {
if let Some(stat) = file_group.file_statistics(None) {
// Project the statistics based on the projection
let output_schema = self.projected_schema()?;
return if let Some(projection) = self.file_source.projection() {
projection.project_statistics(stat.clone(), &output_schema)
} else {
Ok(stat.clone())
};
}
if let Some(file_group) = self.file_groups.get(partition)
&& let Some(stat) = file_group.file_statistics(None)
{
// Project the statistics based on the projection
let output_schema = self.projected_schema()?;
return if let Some(projection) = self.file_source.projection() {
projection.project_statistics(stat.clone(), &output_schema)
} else {
Ok(stat.clone())
};
}
// If no statistics available for this partition, return unknown
Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
Expand Down Expand Up @@ -1217,16 +1219,16 @@ mod tests {
use std::collections::HashMap;

use super::*;
use crate::test_util::col;
use crate::TableSchema;
use crate::test_util::col;
use crate::{
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
verify_sort_integrity,
};

use arrow::datatypes::Field;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, ColumnStatistics};
use datafusion_common::{ColumnStatistics, internal_err};
use datafusion_expr::{Operator, SortExpr};
use datafusion_physical_expr::create_physical_sort_expr;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
Expand Down Expand Up @@ -1267,7 +1269,7 @@ mod tests {
use chrono::TimeZone;
use datafusion_common::DFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use object_store::{path::Path, ObjectMeta};
use object_store::{ObjectMeta, path::Path};

struct File {
name: &'static str,
Expand Down Expand Up @@ -1368,12 +1370,16 @@ mod tests {
true,
)]),
files: vec![
File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
File::new_nullable(
"0",
"2023-01-01",
vec![Some((Some(0.00), Some(0.49)))],
),
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "nullable sort columns, nulls first",
Expand All @@ -1384,11 +1390,15 @@ mod tests {
)]),
files: vec![
File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
File::new_nullable(
"1",
"2023-01-01",
vec![Some((Some(0.50), Some(1.00)))],
),
File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
],
sort: vec![col("value").sort(true, true)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "all three non-overlapping",
Expand Down Expand Up @@ -1444,7 +1454,9 @@ mod tests {
File::new("2", "2023-01-02", vec![None]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
expected_result: Err(
"construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
),
},
];

Expand Down Expand Up @@ -1621,10 +1633,12 @@ mod tests {
"test.parquet".to_string(),
1024,
)])])
.with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
Column::new("date", 0),
))]
.into()])
.with_output_ordering(vec![
[PhysicalSortExpr::new_default(Arc::new(Column::new(
"date", 0,
)))]
.into(),
])
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.with_newlines_in_values(true)
.build();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/file_sink_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

use std::sync::Arc;

use crate::ListingTableUrl;
use crate::file_groups::FileGroup;
use crate::sink::DataSink;
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
use crate::ListingTableUrl;
use crate::write::demux::{DemuxedStreamReceiver, start_demuxer_task};

use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::Result;
Expand Down
Loading