File tree Expand file tree Collapse file tree 1 file changed +6
-5
lines changed
Expand file tree Collapse file tree 1 file changed +6
-5
lines changed Original file line number Diff line number Diff line change 1212
1313use std:: { io:: Cursor , sync:: Arc } ;
1414
15- use futures:: future;
16- use futures:: stream:: FuturesUnordered ;
15+ use futures:: future:: { self , try_join_all} ;
1716use itertools:: Itertools ;
1817use manifest:: ManifestReader ;
1918use manifest_list:: read_snapshot;
@@ -326,7 +325,7 @@ async fn datafiles(
326325 None => Box :: new ( manifests. iter ( ) ) ,
327326 } ;
328327
329- let stream : FuturesUnordered < _ > = iter
328+ let futures : Vec < _ > = iter
330329 . map ( move |file| {
331330 let object_store = object_store. clone ( ) ;
332331 async move {
@@ -344,8 +343,10 @@ async fn datafiles(
344343 } )
345344 . collect ( ) ;
346345
347- Ok ( stream. flat_map ( move |result| {
348- let ( bytes, path, sequence_number) = result. unwrap ( ) ;
346+ let results = try_join_all ( futures) . await ?;
347+
348+ Ok ( stream:: iter ( results) . flat_map ( move |result| {
349+ let ( bytes, path, sequence_number) = result;
349350
350351 let reader = ManifestReader :: new ( bytes) . unwrap ( ) ;
351352 stream:: iter ( reader) . try_filter_map ( move |mut x| {
You can’t perform that action at this time.
0 commit comments