Skip to content

Commit 0a0416d

Browse files
adriangbclaude
andauthored
Move newlines_in_values from FileScanConfig to CsvSource (#19313)
## Summary This PR moves the CSV-specific `newlines_in_values` configuration option from `FileScanConfig` (a shared format-agnostic configuration) to `CsvSource` where it belongs. - Add `newlines_in_values` field and methods to `CsvSource` - Add `has_newlines_in_values()` method to `FileSource` trait (returns `false` by default) - Update `FileSource::repartitioned()` to use the new trait method - Remove `new_lines_in_values` from `FileScanConfig` and its builder - Update proto serialization to read from/write to `CsvSource` - Update tests and documentation - Add migration guide to `upgrading.md` Closes #18453 ## Test plan - [x] All existing tests pass - [x] Doc tests pass - [x] Proto roundtrip tests pass - [x] Clippy clean 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent cb3fa1c commit 0a0416d

File tree

9 files changed

+84
-49
lines changed

9 files changed

+84
-49
lines changed

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ mod tests {
128128
let config =
129129
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
130130
.with_file_compression_type(file_compression_type)
131-
.with_newlines_in_values(false)
132131
.with_projection_indices(Some(vec![0, 2, 4]))?
133132
.build();
134133

@@ -200,7 +199,6 @@ mod tests {
200199
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
201200
let config =
202201
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
203-
.with_newlines_in_values(false)
204202
.with_file_compression_type(file_compression_type.to_owned())
205203
.with_projection_indices(Some(vec![4, 0, 2]))?
206204
.build();
@@ -272,7 +270,6 @@ mod tests {
272270
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
273271
let config =
274272
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
275-
.with_newlines_in_values(false)
276273
.with_file_compression_type(file_compression_type.to_owned())
277274
.with_limit(Some(5))
278275
.build();
@@ -343,7 +340,6 @@ mod tests {
343340
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
344341
let config =
345342
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
346-
.with_newlines_in_values(false)
347343
.with_file_compression_type(file_compression_type.to_owned())
348344
.with_limit(Some(5))
349345
.build();
@@ -412,7 +408,6 @@ mod tests {
412408
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
413409
let config =
414410
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
415-
.with_newlines_in_values(false)
416411
.with_file_compression_type(file_compression_type.to_owned())
417412
// We should be able to project on the partition column
418413
// Which is supposed to be after the file fields
@@ -518,7 +513,6 @@ mod tests {
518513
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
519514
let config =
520515
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
521-
.with_newlines_in_values(false)
522516
.with_file_compression_type(file_compression_type.to_owned())
523517
.build();
524518
let csv = DataSourceExec::from_data_source(config);

datafusion/datasource-arrow/src/source.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,9 +460,7 @@ impl FileSource for ArrowSource {
460460
// Use the default trait implementation logic for file format
461461
use datafusion_datasource::file_groups::FileGroupPartitioner;
462462

463-
if config.file_compression_type.is_compressed()
464-
|| config.new_lines_in_values
465-
{
463+
if config.file_compression_type.is_compressed() {
466464
return Ok(None);
467465
}
468466

datafusion/datasource-csv/src/file_format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ impl FileFormat for CsvFormat {
437437

438438
let mut csv_options = self.options.clone();
439439
csv_options.has_header = Some(has_header);
440+
csv_options.newlines_in_values = Some(newlines_in_values);
440441

441442
// Get the existing CsvSource and update its options
442443
// We need to preserve the table_schema from the original source (which includes partition columns)
@@ -449,7 +450,6 @@ impl FileFormat for CsvFormat {
449450

450451
let config = FileScanConfigBuilder::from(conf)
451452
.with_file_compression_type(self.options.compression.into())
452-
.with_newlines_in_values(newlines_in_values)
453453
.with_source(source)
454454
.build();
455455

datafusion/datasource-csv/src/source.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use tokio::io::AsyncWriteExt;
7272
/// has_header: Some(true),
7373
/// delimiter: b',',
7474
/// quote: b'"',
75+
/// newlines_in_values: Some(true), // The file contains newlines in values
7576
/// ..Default::default()
7677
/// };
7778
/// let source = Arc::new(CsvSource::new(file_schema.clone())
@@ -81,7 +82,6 @@ use tokio::io::AsyncWriteExt;
8182
/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
8283
/// let config = FileScanConfigBuilder::new(object_store_url, source)
8384
/// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
84-
/// .with_newlines_in_values(true) // The file contains newlines in values;
8585
/// .build();
8686
/// let exec = (DataSourceExec::from_data_source(config));
8787
/// ```
@@ -176,6 +176,11 @@ impl CsvSource {
176176
conf.options.truncated_rows = Some(truncate_rows);
177177
conf
178178
}
179+
180+
/// Whether values may contain newline characters
181+
pub fn newlines_in_values(&self) -> bool {
182+
self.options.newlines_in_values.unwrap_or(false)
183+
}
179184
}
180185

181186
impl CsvSource {
@@ -297,6 +302,13 @@ impl FileSource for CsvSource {
297302
fn file_type(&self) -> &str {
298303
"csv"
299304
}
305+
306+
fn supports_repartitioning(&self) -> bool {
307+
// Cannot repartition if values may contain newlines, as record
308+
// boundaries cannot be determined by byte offset alone
309+
!self.options.newlines_in_values.unwrap_or(false)
310+
}
311+
300312
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
301313
match t {
302314
DisplayFormatType::Default | DisplayFormatType::Verbose => {

datafusion/datasource/src/file.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,21 @@ pub trait FileSource: Send + Sync {
8686
Ok(())
8787
}
8888

89+
/// Returns whether this file source supports repartitioning files by byte ranges.
90+
///
91+
/// When this returns `true`, files can be split into multiple partitions
92+
/// based on byte offsets for parallel reading.
93+
///
94+
/// When this returns `false`, files cannot be repartitioned (e.g., CSV files
95+
/// with `newlines_in_values` enabled cannot be split because record boundaries
96+
/// cannot be determined by byte offset alone).
97+
///
98+
/// The default implementation returns `true`. File sources that cannot support
99+
/// repartitioning should override this method.
100+
fn supports_repartitioning(&self) -> bool {
101+
true
102+
}
103+
89104
/// If supported by the [`FileSource`], redistribute files across partitions
90105
/// according to their size. Allows custom file formats to implement their
91106
/// own repartitioning logic.
@@ -99,7 +114,8 @@ pub trait FileSource: Send + Sync {
99114
output_ordering: Option<LexOrdering>,
100115
config: &FileScanConfig,
101116
) -> Result<Option<FileScanConfig>> {
102-
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
117+
if config.file_compression_type.is_compressed() || !self.supports_repartitioning()
118+
{
103119
return Ok(None);
104120
}
105121

datafusion/datasource/src/file_scan_config.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,6 @@ pub struct FileScanConfig {
160160
pub output_ordering: Vec<LexOrdering>,
161161
/// File compression type
162162
pub file_compression_type: FileCompressionType,
163-
/// Are new lines in values supported for CSVOptions
164-
pub new_lines_in_values: bool,
165163
/// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
166164
pub file_source: Arc<dyn FileSource>,
167165
/// Batch size while creating new batches
@@ -251,7 +249,6 @@ pub struct FileScanConfigBuilder {
251249
statistics: Option<Statistics>,
252250
output_ordering: Vec<LexOrdering>,
253251
file_compression_type: Option<FileCompressionType>,
254-
new_lines_in_values: Option<bool>,
255252
batch_size: Option<usize>,
256253
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
257254
partitioned_by_file_group: bool,
@@ -275,7 +272,6 @@ impl FileScanConfigBuilder {
275272
statistics: None,
276273
output_ordering: vec![],
277274
file_compression_type: None,
278-
new_lines_in_values: None,
279275
limit: None,
280276
constraints: None,
281277
batch_size: None,
@@ -414,16 +410,6 @@ impl FileScanConfigBuilder {
414410
self
415411
}
416412

417-
/// Set whether new lines in values are supported for CSVOptions
418-
///
419-
/// Parsing newlines in quoted values may be affected by execution behaviour such as
420-
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
421-
/// parsed successfully, which may reduce performance.
422-
pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
423-
self.new_lines_in_values = Some(new_lines_in_values);
424-
self
425-
}
426-
427413
/// Set the batch_size property
428414
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
429415
self.batch_size = batch_size;
@@ -473,7 +459,6 @@ impl FileScanConfigBuilder {
473459
statistics,
474460
output_ordering,
475461
file_compression_type,
476-
new_lines_in_values,
477462
batch_size,
478463
expr_adapter_factory: expr_adapter,
479464
partitioned_by_file_group,
@@ -485,7 +470,6 @@ impl FileScanConfigBuilder {
485470
});
486471
let file_compression_type =
487472
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
488-
let new_lines_in_values = new_lines_in_values.unwrap_or(false);
489473

490474
FileScanConfig {
491475
object_store_url,
@@ -495,7 +479,6 @@ impl FileScanConfigBuilder {
495479
file_groups,
496480
output_ordering,
497481
file_compression_type,
498-
new_lines_in_values,
499482
batch_size,
500483
expr_adapter_factory: expr_adapter,
501484
statistics,
@@ -513,7 +496,6 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
513496
statistics: Some(config.statistics),
514497
output_ordering: config.output_ordering,
515498
file_compression_type: Some(config.file_compression_type),
516-
new_lines_in_values: Some(config.new_lines_in_values),
517499
limit: config.limit,
518500
constraints: Some(config.constraints),
519501
batch_size: config.batch_size,
@@ -945,6 +927,22 @@ impl FileScanConfig {
945927
Ok(())
946928
}
947929

930+
/// Returns whether newlines in values are supported.
931+
///
932+
/// This method always returns `false`. The actual newlines_in_values setting
933+
/// has been moved to [`CsvSource`] and should be accessed via
934+
/// [`CsvSource::csv_options()`] instead.
935+
///
936+
/// [`CsvSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html
937+
/// [`CsvSource::csv_options()`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html#method.csv_options
938+
#[deprecated(
939+
since = "52.0.0",
940+
note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
941+
)]
942+
pub fn newlines_in_values(&self) -> bool {
943+
false
944+
}
945+
948946
#[deprecated(
949947
since = "52.0.0",
950948
note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
@@ -954,17 +952,6 @@ impl FileScanConfig {
954952
props.constraints().clone()
955953
}
956954

957-
/// Specifies whether newlines in (quoted) values are supported.
958-
///
959-
/// Parsing newlines in quoted values may be affected by execution behaviour such as
960-
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
961-
/// parsed successfully, which may reduce performance.
962-
///
963-
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
964-
pub fn newlines_in_values(&self) -> bool {
965-
self.new_lines_in_values
966-
}
967-
968955
#[deprecated(
969956
since = "52.0.0",
970957
note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
@@ -1793,7 +1780,6 @@ mod tests {
17931780
.into(),
17941781
])
17951782
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1796-
.with_newlines_in_values(true)
17971783
.build();
17981784

17991785
// Verify the built config has all the expected values
@@ -1820,7 +1806,6 @@ mod tests {
18201806
config.file_compression_type,
18211807
FileCompressionType::UNCOMPRESSED
18221808
);
1823-
assert!(config.new_lines_in_values);
18241809
assert_eq!(config.output_ordering.len(), 1);
18251810
}
18261811

@@ -1915,7 +1900,6 @@ mod tests {
19151900
config.file_compression_type,
19161901
FileCompressionType::UNCOMPRESSED
19171902
);
1918-
assert!(!config.new_lines_in_values);
19191903
assert!(config.output_ordering.is_empty());
19201904
assert!(config.constraints.is_empty());
19211905

@@ -1963,7 +1947,6 @@ mod tests {
19631947
.with_limit(Some(10))
19641948
.with_file(file.clone())
19651949
.with_constraints(Constraints::default())
1966-
.with_newlines_in_values(true)
19671950
.build();
19681951

19691952
// Create a new builder from the config
@@ -1993,7 +1976,6 @@ mod tests {
19931976
"test_file.parquet"
19941977
);
19951978
assert_eq!(new_config.constraints, Constraints::default());
1996-
assert!(new_config.new_lines_in_values);
19971979
}
19981980

19991981
#[test]

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ impl protobuf::PhysicalPlanNode {
631631
has_header: Some(scan.has_header),
632632
delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
633633
quote: str_to_byte(&scan.quote, "quote")?,
634+
newlines_in_values: Some(scan.newlines_in_values),
634635
..Default::default()
635636
};
636637
let source = Arc::new(
@@ -646,7 +647,6 @@ impl protobuf::PhysicalPlanNode {
646647
extension_codec,
647648
source,
648649
)?)
649-
.with_newlines_in_values(scan.newlines_in_values)
650650
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
651651
.build();
652652
Ok(DataSourceExec::from_data_source(conf))
@@ -2631,7 +2631,7 @@ impl protobuf::PhysicalPlanNode {
26312631
} else {
26322632
None
26332633
},
2634-
newlines_in_values: maybe_csv.newlines_in_values(),
2634+
newlines_in_values: csv_config.newlines_in_values(),
26352635
truncate_rows: csv_config.truncate_rows(),
26362636
},
26372637
)),

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,6 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
932932
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
933933
.with_projection_indices(Some(vec![0, 1]))?
934934
.with_file_group(FileGroup::new(vec![file_group]))
935-
.with_newlines_in_values(false)
936935
.build();
937936

938937
roundtrip_test(DataSourceExec::from_data_source(scan_config))

docs/source/library-user-guide/upgrading.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,40 @@ See <https://github.com/apache/datafusion/issues/19056> for more details.
5757

5858
Note that the internal API has changed to use a trait `ListFilesCache` instead of a type alias.
5959

60+
### `newlines_in_values` moved from `FileScanConfig` to `CsvOptions`
61+
62+
The CSV-specific `newlines_in_values` configuration option has been moved from `FileScanConfig` to `CsvOptions`, as it only applies to CSV file parsing.
63+
64+
**Who is affected:**
65+
66+
- Users who set `newlines_in_values` via `FileScanConfigBuilder::with_newlines_in_values()`
67+
68+
**Migration guide:**
69+
70+
Set `newlines_in_values` in `CsvOptions` instead of on `FileScanConfigBuilder`:
71+
72+
**Before:**
73+
74+
```rust,ignore
75+
let source = Arc::new(CsvSource::new(file_schema.clone()));
76+
let config = FileScanConfigBuilder::new(object_store_url, source)
77+
.with_newlines_in_values(true)
78+
.build();
79+
```
80+
81+
**After:**
82+
83+
```rust,ignore
84+
let options = CsvOptions {
85+
newlines_in_values: Some(true),
86+
..Default::default()
87+
};
88+
let source = Arc::new(CsvSource::new(file_schema.clone())
89+
.with_csv_options(options));
90+
let config = FileScanConfigBuilder::new(object_store_url, source)
91+
.build();
92+
```
93+
6094
### Removal of `pyarrow` feature
6195

6296
The `pyarrow` feature flag has been removed. This feature has been migrated to

0 commit comments

Comments
 (0)