Skip to content

Commit 2a08013

Browse files
authored
chore: update 6 crates to rust edition 2024 (#19196)
## Which issue does this PR close? This addresses part of #15804 but does not close it. ## Rationale for this change Now that we are on MSRV 1.88 we can use rust edition 2024, which brings let chains and other nice features. It also improves `unsafe` checking. In order to introduce these changes in slower way instead of one massive PR that is too difficult to manage we are updating a few crates at a time. ## What changes are included in this PR? Updates these crates to 2024. - datafusion-datasource - datafusion-functions-nested - datafusion-optimizer - datafusion-physical-plan - datafusion-session - datafusion-sql ## Are these changes tested? Existing unit tests. There are no functional code changes. ## Are there any user-facing changes? None. ## Note It is recommended to review with the ignore whitespace setting: https://github.com/apache/datafusion/pull/19196/files?w=1
1 parent 79a2f5e commit 2a08013

File tree

212 files changed

+1971
-1716
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

212 files changed

+1971
-1716
lines changed

datafusion/datasource/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ name = "datafusion-datasource"
2020
description = "datafusion-datasource"
2121
readme = "README.md"
2222
authors.workspace = true
23-
edition.workspace = true
23+
edition = "2024"
2424
homepage.workspace = true
2525
license.workspace = true
2626
repository.workspace = true

datafusion/datasource/benches/split_groups_by_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion_datasource::{generate_test_files, verify_sort_integrity};
2424
use datafusion_physical_expr::expressions::Column;
2525
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
2626

27-
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
27+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
2828

2929
pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
3030
let file_schema = Arc::new(Schema::new(vec![Field::new(

datafusion/datasource/src/decoder.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ use arrow::error::ArrowError;
2424
use bytes::Buf;
2525
use bytes::Bytes;
2626
use datafusion_common::Result;
27-
use futures::stream::BoxStream;
2827
use futures::StreamExt as _;
29-
use futures::{ready, Stream};
28+
use futures::stream::BoxStream;
29+
use futures::{Stream, ready};
3030
use std::collections::VecDeque;
3131
use std::fmt;
3232
use std::task::Poll;
@@ -175,17 +175,19 @@ pub fn deserialize_stream<'a>(
175175
mut input: impl Stream<Item = Result<Bytes>> + Unpin + Send + 'a,
176176
mut deserializer: impl BatchDeserializer<Bytes> + 'a,
177177
) -> BoxStream<'a, Result<RecordBatch, ArrowError>> {
178-
futures::stream::poll_fn(move |cx| loop {
179-
match ready!(input.poll_next_unpin(cx)).transpose()? {
180-
Some(b) => _ = deserializer.digest(b),
181-
None => deserializer.finish(),
182-
};
183-
184-
return match deserializer.next()? {
185-
DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
186-
DeserializerOutput::InputExhausted => Poll::Ready(None),
187-
DeserializerOutput::RequiresMoreData => continue,
188-
};
178+
futures::stream::poll_fn(move |cx| {
179+
loop {
180+
match ready!(input.poll_next_unpin(cx)).transpose()? {
181+
Some(b) => _ = deserializer.digest(b),
182+
None => deserializer.finish(),
183+
};
184+
185+
return match deserializer.next()? {
186+
DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
187+
DeserializerOutput::InputExhausted => Poll::Ready(None),
188+
DeserializerOutput::RequiresMoreData => continue,
189+
};
190+
}
189191
})
190192
.boxed()
191193
}

datafusion/datasource/src/display.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ mod tests {
135135
use super::*;
136136

137137
use datafusion_physical_plan::{DefaultDisplay, VerboseDisplay};
138-
use object_store::{path::Path, ObjectMeta};
138+
use object_store::{ObjectMeta, path::Path};
139139

140140
use crate::PartitionedFile;
141141
use chrono::Utc;

datafusion/datasource/src/file.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ use crate::file_scan_config::FileScanConfig;
2727
use crate::file_stream::FileOpener;
2828
use crate::schema_adapter::SchemaAdapterFactory;
2929
use datafusion_common::config::ConfigOptions;
30-
use datafusion_common::{not_impl_err, Result};
30+
use datafusion_common::{Result, not_impl_err};
3131
use datafusion_physical_expr::projection::ProjectionExprs;
3232
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
33+
use datafusion_physical_plan::DisplayFormatType;
3334
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
3435
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
35-
use datafusion_physical_plan::DisplayFormatType;
3636

3737
use object_store::ObjectStore;
3838

datafusion/datasource/src/file_compression_type.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use std::str::FromStr;
2121

2222
use datafusion_common::error::{DataFusionError, Result};
2323

24-
use datafusion_common::parsers::CompressionTypeVariant::{self, *};
2524
use datafusion_common::GetExt;
25+
use datafusion_common::parsers::CompressionTypeVariant::{self, *};
2626

2727
#[cfg(feature = "compression")]
2828
use async_compression::tokio::bufread::{
@@ -39,10 +39,10 @@ use bytes::Bytes;
3939
use bzip2::read::MultiBzDecoder;
4040
#[cfg(feature = "compression")]
4141
use flate2::read::MultiGzDecoder;
42-
use futures::stream::BoxStream;
4342
use futures::StreamExt;
4443
#[cfg(feature = "compression")]
4544
use futures::TryStreamExt;
45+
use futures::stream::BoxStream;
4646
#[cfg(feature = "compression")]
4747
use liblzma::read::XzDecoder;
4848
use object_store::buffered::BufWriter;
@@ -148,7 +148,7 @@ impl FileCompressionType {
148148
GZIP | BZIP2 | XZ | ZSTD => {
149149
return Err(DataFusionError::NotImplemented(
150150
"Compression feature is not enabled".to_owned(),
151-
))
151+
));
152152
}
153153
UNCOMPRESSED => s.boxed(),
154154
})
@@ -173,7 +173,7 @@ impl FileCompressionType {
173173
GZIP | BZIP2 | XZ | ZSTD => {
174174
return Err(DataFusionError::NotImplemented(
175175
"Compression feature is not enabled".to_owned(),
176-
))
176+
));
177177
}
178178
UNCOMPRESSED => Box::new(w),
179179
})
@@ -210,7 +210,7 @@ impl FileCompressionType {
210210
GZIP | BZIP2 | XZ | ZSTD => {
211211
return Err(DataFusionError::NotImplemented(
212212
"Compression feature is not enabled".to_owned(),
213-
))
213+
));
214214
}
215215
UNCOMPRESSED => s.boxed(),
216216
})
@@ -237,7 +237,7 @@ impl FileCompressionType {
237237
GZIP | BZIP2 | XZ | ZSTD => {
238238
return Err(DataFusionError::NotImplemented(
239239
"Compression feature is not enabled".to_owned(),
240-
))
240+
));
241241
}
242242
UNCOMPRESSED => Box::new(r),
243243
})

datafusion/datasource/src/file_format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::file_sink_config::FileSinkConfig;
3030

3131
use arrow::datatypes::SchemaRef;
3232
use datafusion_common::file_options::file_type::FileType;
33-
use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics};
33+
use datafusion_common::{GetExt, Result, Statistics, internal_err, not_impl_err};
3434
use datafusion_physical_expr::LexRequirement;
3535
use datafusion_physical_plan::ExecutionPlan;
3636
use datafusion_session::Session;

datafusion/datasource/src/file_groups.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use crate::{FileRange, PartitionedFile};
2121
use datafusion_common::Statistics;
2222
use itertools::Itertools;
23-
use std::cmp::{min, Ordering};
23+
use std::cmp::{Ordering, min};
2424
use std::collections::BinaryHeap;
2525
use std::iter::repeat_with;
2626
use std::mem;

datafusion/datasource/src/file_scan_config.rs

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,32 @@
2020
2121
use crate::file_groups::FileGroup;
2222
use crate::{
23-
display::FileGroupsDisplay, file::FileSource,
23+
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
2424
file_compression_type::FileCompressionType, file_stream::FileStream,
25-
source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
25+
source::DataSource, statistics::MinMaxStatistics,
2626
};
2727
use arrow::datatypes::FieldRef;
2828
use arrow::datatypes::{DataType, Schema, SchemaRef};
2929
use datafusion_common::config::ConfigOptions;
3030
use datafusion_common::{
31-
internal_datafusion_err, internal_err, Constraints, Result, ScalarValue, Statistics,
31+
Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
3232
};
3333
use datafusion_execution::{
34-
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
34+
SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
3535
};
3636
use datafusion_expr::Operator;
3737
use datafusion_physical_expr::expressions::BinaryExpr;
3838
use datafusion_physical_expr::projection::ProjectionExprs;
3939
use datafusion_physical_expr::utils::reassign_expr_columns;
40-
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
40+
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
4141
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
4242
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4343
use datafusion_physical_expr_common::sort_expr::LexOrdering;
4444
use datafusion_physical_plan::{
45-
display::{display_orderings, ProjectSchemaDisplay},
45+
DisplayAs, DisplayFormatType,
46+
display::{ProjectSchemaDisplay, display_orderings},
4647
filter_pushdown::FilterPushdownPropagation,
4748
metrics::ExecutionPlanMetricsSet,
48-
DisplayAs, DisplayFormatType,
4949
};
5050
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
5151

@@ -303,7 +303,9 @@ impl FileScanConfigBuilder {
303303
match self.clone().with_projection_indices(indices) {
304304
Ok(builder) => builder,
305305
Err(e) => {
306-
warn!("Failed to push down projection in FileScanConfigBuilder::with_projection: {e}");
306+
warn!(
307+
"Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
308+
);
307309
self
308310
}
309311
}
@@ -643,16 +645,16 @@ impl DataSource for FileScanConfig {
643645
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
644646
if let Some(partition) = partition {
645647
// Get statistics for a specific partition
646-
if let Some(file_group) = self.file_groups.get(partition) {
647-
if let Some(stat) = file_group.file_statistics(None) {
648-
// Project the statistics based on the projection
649-
let output_schema = self.projected_schema()?;
650-
return if let Some(projection) = self.file_source.projection() {
651-
projection.project_statistics(stat.clone(), &output_schema)
652-
} else {
653-
Ok(stat.clone())
654-
};
655-
}
648+
if let Some(file_group) = self.file_groups.get(partition)
649+
&& let Some(stat) = file_group.file_statistics(None)
650+
{
651+
// Project the statistics based on the projection
652+
let output_schema = self.projected_schema()?;
653+
return if let Some(projection) = self.file_source.projection() {
654+
projection.project_statistics(stat.clone(), &output_schema)
655+
} else {
656+
Ok(stat.clone())
657+
};
656658
}
657659
// If no statistics available for this partition, return unknown
658660
Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
@@ -1217,16 +1219,16 @@ mod tests {
12171219
use std::collections::HashMap;
12181220

12191221
use super::*;
1220-
use crate::test_util::col;
12211222
use crate::TableSchema;
1223+
use crate::test_util::col;
12221224
use crate::{
12231225
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
12241226
verify_sort_integrity,
12251227
};
12261228

12271229
use arrow::datatypes::Field;
12281230
use datafusion_common::stats::Precision;
1229-
use datafusion_common::{internal_err, ColumnStatistics};
1231+
use datafusion_common::{ColumnStatistics, internal_err};
12301232
use datafusion_expr::{Operator, SortExpr};
12311233
use datafusion_physical_expr::create_physical_sort_expr;
12321234
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
@@ -1267,7 +1269,7 @@ mod tests {
12671269
use chrono::TimeZone;
12681270
use datafusion_common::DFSchema;
12691271
use datafusion_expr::execution_props::ExecutionProps;
1270-
use object_store::{path::Path, ObjectMeta};
1272+
use object_store::{ObjectMeta, path::Path};
12711273

12721274
struct File {
12731275
name: &'static str,
@@ -1368,12 +1370,16 @@ mod tests {
13681370
true,
13691371
)]),
13701372
files: vec![
1371-
File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1373+
File::new_nullable(
1374+
"0",
1375+
"2023-01-01",
1376+
vec![Some((Some(0.00), Some(0.49)))],
1377+
),
13721378
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
13731379
File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
13741380
],
13751381
sort: vec![col("value").sort(true, false)],
1376-
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1382+
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
13771383
},
13781384
TestCase {
13791385
name: "nullable sort columns, nulls first",
@@ -1384,11 +1390,15 @@ mod tests {
13841390
)]),
13851391
files: vec![
13861392
File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1387-
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
1393+
File::new_nullable(
1394+
"1",
1395+
"2023-01-01",
1396+
vec![Some((Some(0.50), Some(1.00)))],
1397+
),
13881398
File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
13891399
],
13901400
sort: vec![col("value").sort(true, true)],
1391-
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1401+
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
13921402
},
13931403
TestCase {
13941404
name: "all three non-overlapping",
@@ -1444,7 +1454,9 @@ mod tests {
14441454
File::new("2", "2023-01-02", vec![None]),
14451455
],
14461456
sort: vec![col("value").sort(true, false)],
1447-
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
1457+
expected_result: Err(
1458+
"construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1459+
),
14481460
},
14491461
];
14501462

@@ -1621,10 +1633,12 @@ mod tests {
16211633
"test.parquet".to_string(),
16221634
1024,
16231635
)])])
1624-
.with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
1625-
Column::new("date", 0),
1626-
))]
1627-
.into()])
1636+
.with_output_ordering(vec![
1637+
[PhysicalSortExpr::new_default(Arc::new(Column::new(
1638+
"date", 0,
1639+
)))]
1640+
.into(),
1641+
])
16281642
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
16291643
.with_newlines_in_values(true)
16301644
.build();

datafusion/datasource/src/file_sink_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
use std::sync::Arc;
1919

20+
use crate::ListingTableUrl;
2021
use crate::file_groups::FileGroup;
2122
use crate::sink::DataSink;
22-
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
23-
use crate::ListingTableUrl;
23+
use crate::write::demux::{DemuxedStreamReceiver, start_demuxer_task};
2424

2525
use arrow::datatypes::{DataType, SchemaRef};
2626
use datafusion_common::Result;

0 commit comments

Comments
 (0)