Skip to content

Commit acf22b5

Browse files
ccciudatucdobre
authored andcommitted
[HSTACK] - consolidate partitions if we have many small files
1 parent 70004bd commit acf22b5

File tree

1 file changed

+43
-21
lines changed

1 file changed

+43
-21
lines changed

datafusion/datasource/src/file_groups.rs

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,13 @@ impl FileGroupPartitioner {
220220
.iter()
221221
.map(|f| f.object_meta.size as i64)
222222
.sum::<i64>();
223-
if total_size < (repartition_file_min_size as i64) || total_size == 0 {
223+
224+
// bail if we are asked to *split* a set of files that are already too small
225+
// if we are being asked to consolidate, we proceed
226+
if (total_size < (repartition_file_min_size as i64)
227+
&& target_partitions >= file_groups.len())
228+
|| total_size == 0
229+
{
224230
return None;
225231
}
226232

@@ -236,30 +242,46 @@ impl FileGroupPartitioner {
236242
.scan(
237243
(current_partition_index, current_partition_size),
238244
|state, source_file| {
239-
let mut produced_files = vec![];
240-
let mut range_start = 0;
241-
while range_start < source_file.object_meta.size {
242-
let range_end = min(
243-
range_start + (target_partition_size - state.1),
244-
source_file.object_meta.size,
245-
);
246-
247-
let mut produced_file = source_file.clone();
248-
produced_file.range = Some(FileRange {
249-
start: range_start as i64,
250-
end: range_end as i64,
251-
});
252-
produced_files.push((state.0, produced_file));
253-
254-
if state.1 + (range_end - range_start) >= target_partition_size {
245+
// Skip splitting files smaller than repartition_file_min_size
246+
// This may result in a few more partitions than requested (maybe 1 more)
247+
if source_file.object_meta.size > 0
248+
&& source_file.object_meta.size < repartition_file_min_size as u64
249+
{
250+
state.1 += source_file.object_meta.size;
251+
if state.1 > target_partition_size {
255252
state.0 += 1;
256253
state.1 = 0;
257-
} else {
258-
state.1 += range_end - range_start;
259254
}
260-
range_start = range_end;
255+
let small_file = (state.0, source_file.clone());
256+
Some(vec![small_file])
257+
} else {
258+
let mut produced_files = vec![];
259+
let mut range_start = 0;
260+
while range_start < source_file.object_meta.size {
261+
let range_end = min(
262+
range_start + (target_partition_size - state.1),
263+
source_file.object_meta.size,
264+
);
265+
266+
let mut produced_file = source_file.clone();
267+
produced_file.range = Some(FileRange {
268+
start: range_start as i64,
269+
end: range_end as i64,
270+
});
271+
produced_files.push((state.0, produced_file));
272+
273+
if state.1 + (range_end - range_start)
274+
>= target_partition_size
275+
{
276+
state.0 += 1;
277+
state.1 = 0;
278+
} else {
279+
state.1 += range_end - range_start;
280+
}
281+
range_start = range_end;
282+
}
283+
Some(produced_files)
261284
}
262-
Some(produced_files)
263285
},
264286
)
265287
.flatten()

0 commit comments

Comments
 (0)