Skip to content

Commit 6797e9a

Browse files
committed
Fixing local vcf without compression and gcs reads optimization
1 parent a1380de commit 6797e9a

File tree

5 files changed

+66
-68
lines changed

5 files changed

+66
-68
lines changed

datafusion/vcf/examples/datafusion_integration.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
use std::sync::Arc;
2-
use datafusion::config::CsvOptions;
3-
use datafusion::dataframe::DataFrameWriteOptions;
42
use datafusion::prelude::SessionContext;
53
use datafusion_vcf::table_provider::VcfTableProvider;
64

@@ -20,12 +18,12 @@ async fn main() -> datafusion::error::Result<()> {
2018
let ctx = SessionContext::new();
2119
ctx.sql("set datafusion.execution.skip_physical_aggregate_schema_check=true").await?;
2220
// let table_provider = VcfTableProvider::new("/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".parse().unwrap(), vec!["SVTYPE".parse().unwrap()], vec![], Some(8))?;
23-
let table_provider = VcfTableProvider::new(path, infos, None, Some(4))?;
21+
let table_provider = VcfTableProvider::new(path, infos, None, Some(4), None,None)?;
2422
ctx.register_table("custom_table", Arc::new(table_provider)).expect("TODO: panic message");
2523
// let df = ctx.sql("SELECT svtype, count(*) as cnt FROM custom_table group by svtype").await?;
26-
let df = ctx.sql("SELECT count(*) as cnt FROM custom_table").await?;
24+
// let df = ctx.sql("SELECT count(*) as cnt FROM custom_table").await?;
2725
// df.clone().write_csv("/tmp/gnomad.exomes.v4.1.sites.chr21-old.csv", DataFrameWriteOptions::default(), Some(CsvOptions::default())).await?;
28-
// let df = ctx.sql("SELECT * FROM custom_table LIMIT 5").await?;
26+
let df = ctx.sql("SELECT chrom FROM custom_table LIMIT 5").await?;
2927
// println!("{:?}", df.explain(false, false)?);
3028
df.show().await.expect("TODO: panic message");
3129
// println!("{:?}", );

datafusion/vcf/examples/noodles_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6767
}
6868

6969

70-
// let builder = Gcs::default()
71-
// .bucket(BUCKET)
72-
// .disable_vm_metadata()
73-
// .allow_anonymous();
70+
let builder = Gcs::default()
71+
.bucket(BUCKET)
72+
.disable_vm_metadata()
73+
.allow_anonymous();
7474
//
7575
// let operator = Operator::new(builder)?.finish();
7676
//

datafusion/vcf/src/physical_exec.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,25 @@ use std::any::Any;
22
use std::fmt::{Debug, Formatter};
33
use std::sync::Arc;
44

5-
use datafusion::arrow::array::{Array, Float64Array, Int32Array, NullArray, StringArray, UInt32Array};
6-
use datafusion::arrow::datatypes::{DataType, Field, SchemaRef};
5+
use datafusion::arrow::array::{Array, Float64Array, NullArray, StringArray, UInt32Array};
6+
use datafusion::arrow::datatypes::{DataType, SchemaRef};
77
use datafusion::arrow::record_batch::RecordBatch;
88
use datafusion::common::DataFusionError;
99
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1010
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1111
use futures::{stream, StreamExt, TryStreamExt};
12-
use std::{io, str};
13-
use std::fs::File;
14-
use std::hash::Hasher;
15-
use std::io::Error;
16-
use std::num::NonZero;
17-
use std::ops::Deref;
12+
use std::str;
1813
use std::time::Instant;
1914
use async_stream::__private::AsyncStream;
2015
use async_stream::try_stream;
21-
use datafusion::arrow::ipc::FieldBuilder;
2216
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
23-
use env_logger::builder;
24-
use log::{debug, info};
25-
use noodles::vcf;
17+
use log::debug;
2618
use noodles::vcf::Header;
2719
use noodles::vcf::header::Infos;
28-
use noodles::vcf::header::record::value::map::Info;
29-
use noodles::vcf::header::record::value::map::info::{Number, Type};
30-
use noodles::vcf::io::Reader;
3120
use noodles::vcf::variant::Record;
3221
use noodles::vcf::variant::record::{AlternateBases, Filters, Ids, ReferenceBases};
3322
use noodles::vcf::variant::record::info::field::{Value, value::Array as ValueArray};
34-
use noodles_bgzf::MultithreadedReader;
35-
use crate::storage::{get_compression_type, get_local_vcf_bgzf_reader, get_remote_stream_bgzf, get_remote_vcf_bgzf_reader, get_remote_vcf_header, get_remote_vcf_reader, get_storage_type, CompressionType, StorageType, VcfRemoteReader};
23+
use crate::storage::{get_local_vcf_bgzf_reader, get_storage_type, StorageType, VcfRemoteReader};
3624
use crate::table_provider::{info_to_arrow_type, OptionalField};
3725

3826
fn build_record_batch(
@@ -192,7 +180,6 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef,
192180
let mut filters: Vec<String> = Vec::with_capacity(batch_size);
193181

194182
let mut count: usize = 0;
195-
let mut record_num = 0;
196183
let mut batch_num = 0;
197184
let schema = Arc::clone(&schema_ref);
198185
let file_path = file_path.clone();
@@ -210,7 +197,6 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef,
210197
let iter_start_time = Instant::now();
211198
while count < batch_size {
212199
let record = records.next();
213-
record_num += 1;
214200
if record.is_none() {
215201
break;
216202
}
@@ -262,8 +248,8 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef,
262248

263249
async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef,
264250
batch_size: usize,
265-
info_fields: Option<Vec<String>>, projection: Option<Vec<usize>>) -> datafusion::error::Result<AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output=()> + Sized>> {
266-
let mut reader = VcfRemoteReader::new(file_path.clone()).await;
251+
info_fields: Option<Vec<String>>, projection: Option<Vec<usize>>, chunk_size: usize, concurrent_fetches: usize) -> datafusion::error::Result<AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output=()> + Sized>> {
252+
let mut reader = VcfRemoteReader::new(file_path.clone(), chunk_size, concurrent_fetches).await;
267253
let header = reader.read_header().await?;
268254
let infos = header.infos();
269255
let mut info_builders: (Vec<String>, Vec<DataType>, Vec<OptionalField>) = (Vec::new(), Vec::new(), Vec::new());
@@ -370,7 +356,7 @@ fn set_info_builders(batch_size: usize, info_fields: Option<Vec<String>>, infos:
370356

371357
async fn get_stream(file_path: String, schema_ref: SchemaRef, batch_size: usize,
372358
thread_num: Option<usize>,
373-
info_fields: Option<Vec<String>>, projection: Option<Vec<usize>>) -> datafusion::error::Result<SendableRecordBatchStream> {
359+
info_fields: Option<Vec<String>>, projection: Option<Vec<usize>>, chunk_size: usize, concurrent_fetches: usize) -> datafusion::error::Result<SendableRecordBatchStream> {
374360
// Open the BGZF-indexed VCF using IndexedReader.
375361

376362
let file_path = file_path.clone();
@@ -383,7 +369,7 @@ async fn get_stream(file_path: String, schema_ref: SchemaRef, batch_size: usize,
383369
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))
384370
},
385371
StorageType::GCS | StorageType::S3=> {
386-
let stream = get_remote_vcf_stream(file_path.clone(), schema.clone(), batch_size, info_fields, projection).await?;
372+
let stream = get_remote_vcf_stream(file_path.clone(), schema.clone(), batch_size, info_fields, projection, chunk_size, concurrent_fetches).await?;
387373
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))
388374
},
389375
_ => panic!("Unsupported storage type")
@@ -402,6 +388,8 @@ pub struct VcfExec {
402388
pub(crate) cache: PlanProperties,
403389
pub(crate) limit: Option<usize>,
404390
pub(crate) thread_num: Option<usize>,
391+
pub(crate) chunk_size: Option<usize>,
392+
pub(crate) concurrent_fetches: Option<usize>,
405393
}
406394

407395

@@ -415,6 +403,8 @@ impl VcfExec {
415403
cache: PlanProperties,
416404
limit: Option<usize>,
417405
thread_num: Option<usize>,
406+
chunk_size: Option<usize>,
407+
concurrent_fetches: Option<usize>,
418408
) -> Self {
419409
debug!("VcfExec::new");
420410
Self {
@@ -426,21 +416,23 @@ impl VcfExec {
426416
cache,
427417
limit,
428418
thread_num,
419+
chunk_size,
420+
concurrent_fetches
429421
}
430422
}
431423
}
432424

433425

434426
impl Debug for VcfExec {
435-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
427+
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
436428
Ok(())
437429
}
438430
}
439431

440432

441433

442434
impl DisplayAs for VcfExec {
443-
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
435+
fn fmt_as(&self, _t: DisplayFormatType, _f: &mut Formatter) -> std::fmt::Result {
444436
Ok(())
445437
}
446438

@@ -464,7 +456,7 @@ impl ExecutionPlan for VcfExec {
464456
vec![]
465457
}
466458

467-
fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
459+
fn with_new_children(self: Arc<Self>, _children: Vec<Arc<dyn ExecutionPlan>>) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
468460
Ok(self)
469461
}
470462

@@ -479,7 +471,9 @@ impl ExecutionPlan for VcfExec {
479471
schema.clone(),
480472
batch_size, self.thread_num,
481473
self.info_fields.clone(),
482-
self.projection.clone());
474+
self.projection.clone(),
475+
self.chunk_size.unwrap_or(64),
476+
self.concurrent_fetches.unwrap_or(8));
483477
let stream = futures::stream::once(fut).try_flatten();
484478
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
485479

datafusion/vcf/src/storage.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use std::fs::File;
2-
use std::io;
32
use std::io::Error;
43
use std::num::NonZero;
5-
use std::sync::Arc;
64
use bytes::Bytes;
7-
use futures::{Stream, StreamExt};
5+
use futures::StreamExt;
86
use futures::stream::BoxStream;
97
use log::debug;
108
use noodles::{bgzf, vcf};
@@ -14,7 +12,7 @@ use noodles_bgzf::{AsyncReader, MultithreadedReader};
1412
use opendal::{FuturesBytesStream, Operator};
1513
use opendal::layers::{LoggingLayer, RetryLayer, TimeoutLayer};
1614
use opendal::services::{Gcs, S3};
17-
use tokio::io::{AsyncRead, BufReader};
15+
use tokio::io::BufReader;
1816
use tokio_util::io::StreamReader;
1917

2018

@@ -95,8 +93,8 @@ pub fn get_compression_type(file_path: String) -> CompressionType {
9593

9694

9795

98-
pub async fn get_remote_stream_bgzf(file_path: String) -> Result<AsyncReader<StreamReader<FuturesBytesStream, bytes::Bytes>>, opendal::Error> {
99-
let remote_stream = StreamReader::new(get_remote_stream(file_path.clone()).await?);
96+
pub async fn get_remote_stream_bgzf(file_path: String, chunk_size: usize, concurrent_fetches: usize) -> Result<AsyncReader<StreamReader<FuturesBytesStream, bytes::Bytes>>, opendal::Error> {
97+
let remote_stream = StreamReader::new(get_remote_stream(file_path.clone(), chunk_size, concurrent_fetches).await?);
10098
Ok(bgzf::r#async::Reader::new(remote_stream))
10199

102100
}
@@ -127,7 +125,7 @@ fn get_bucket_name(file_path: String) -> String {
127125
bucket_name.to_string()
128126
}
129127

130-
pub async fn get_remote_stream(file_path: String) -> Result<FuturesBytesStream, opendal::Error> {
128+
pub async fn get_remote_stream(file_path: String, chunk_size: usize, concurrent_fetches: usize) -> Result<FuturesBytesStream, opendal::Error> {
131129
let storage_type = get_storage_type(file_path.clone());
132130
let bucket_name = get_bucket_name(file_path.clone());
133131
let file_path = get_file_path(file_path.clone());
@@ -142,7 +140,10 @@ pub async fn get_remote_stream(file_path: String) -> Result<FuturesBytesStream,
142140
.layer(RetryLayer::new().with_max_times(3))
143141
.layer(LoggingLayer::default())
144142
.finish();
145-
operator.reader_with(file_path.as_str()).concurrent(1).await?.into_bytes_stream(..).await
143+
operator.reader_with(file_path.as_str())
144+
.chunk(chunk_size * 1024 * 1024)
145+
.concurrent(concurrent_fetches)
146+
.await?.into_bytes_stream(..).await
146147
}
147148
StorageType::S3 => {
148149
let builder = S3::default()
@@ -162,15 +163,15 @@ pub async fn get_remote_stream(file_path: String) -> Result<FuturesBytesStream,
162163
}
163164
}
164165

165-
pub async fn get_remote_vcf_bgzf_reader(file_path: String) -> vcf::r#async::io::Reader<AsyncReader<StreamReader<FuturesBytesStream, Bytes>>> {
166-
let inner = get_remote_stream_bgzf(file_path.clone()).await.unwrap();
167-
let mut reader = vcf::r#async::io::Reader::new(inner);
166+
pub async fn get_remote_vcf_bgzf_reader(file_path: String, chunk_size: usize, concurrent_fetches: usize) -> vcf::r#async::io::Reader<AsyncReader<StreamReader<FuturesBytesStream, Bytes>>> {
167+
let inner = get_remote_stream_bgzf(file_path.clone(), chunk_size, concurrent_fetches).await.unwrap();
168+
let reader = vcf::r#async::io::Reader::new(inner);
168169
reader
169170
}
170171

171-
pub async fn get_remote_vcf_reader(file_path: String) -> vcf::r#async::io::Reader<StreamReader<FuturesBytesStream, Bytes>> {
172-
let inner = StreamReader::new(get_remote_stream(file_path.clone()).await.unwrap());
173-
let mut reader = vcf::r#async::io::Reader::new(inner);
172+
pub async fn get_remote_vcf_reader(file_path: String, chunk_size: usize, concurrent_fetches: usize) -> vcf::r#async::io::Reader<StreamReader<FuturesBytesStream, Bytes>> {
173+
let inner = StreamReader::new(get_remote_stream(file_path.clone(), chunk_size, concurrent_fetches).await.unwrap());
174+
let reader = vcf::r#async::io::Reader::new(inner);
174175
reader
175176
}
176177

@@ -187,7 +188,7 @@ pub fn get_local_vcf_bgzf_reader(file_path: String, thread_num: usize) -> Result
187188

188189
pub async fn get_local_vcf_reader(file_path: String) -> Result<vcf::r#async::io::Reader<BufReader<tokio::fs::File>>, Error> {
189190
debug!("Reading VCF file from local storage with async reader");
190-
let reader = tokio::fs::File::open("sample.vcf")
191+
let reader = tokio::fs::File::open(file_path)
191192
.await
192193
.map(BufReader::new)
193194
.map(vcf::r#async::io::Reader::new)?;
@@ -210,15 +211,15 @@ pub async fn get_local_vcf_header(file_path: String, thread_num: usize) -> Resul
210211
Ok(header)
211212
}
212213

213-
pub async fn get_remote_vcf_header(file_path: String) -> Result<vcf::Header, Error> {
214+
pub async fn get_remote_vcf_header(file_path: String, chunk_size: usize, concurrent_fetches: usize) -> Result<vcf::Header, Error> {
214215
let compression_type = get_compression_type(file_path.clone());
215216
let header = match compression_type {
216217
CompressionType::BGZF | CompressionType::GZIP=> {
217-
let mut reader = get_remote_vcf_bgzf_reader(file_path).await;
218+
let mut reader = get_remote_vcf_bgzf_reader(file_path, chunk_size, concurrent_fetches).await;
218219
reader.read_header().await?
219220
}
220221
CompressionType::NONE => {
221-
let mut reader = get_remote_vcf_reader(file_path).await;
222+
let mut reader = get_remote_vcf_reader(file_path, chunk_size, concurrent_fetches).await;
222223
reader.read_header().await?
223224
}
224225
};
@@ -232,7 +233,7 @@ pub async fn get_header(file_path: String) -> Result<vcf::Header, Error> {
232233
get_local_vcf_header(file_path, 1).await?
233234
}
234235
_ => {
235-
get_remote_vcf_header(file_path).await?
236+
get_remote_vcf_header(file_path, 64,1).await?
236237
}
237238
};
238239
Ok(header)
@@ -244,15 +245,15 @@ pub enum VcfRemoteReader {
244245
}
245246

246247
impl VcfRemoteReader {
247-
pub async fn new(file_path: String) -> Self {
248+
pub async fn new(file_path: String, chunk_size: usize, concurrent_fetches: usize) -> Self {
248249
let compression_type = get_compression_type(file_path.clone());
249250
match compression_type {
250251
CompressionType::BGZF | CompressionType::GZIP=> {
251-
let reader = get_remote_vcf_bgzf_reader(file_path).await;
252+
let reader = get_remote_vcf_bgzf_reader(file_path, chunk_size, concurrent_fetches).await;
252253
VcfRemoteReader::BGZF(reader)
253254
}
254255
CompressionType::NONE => {
255-
let reader = get_remote_vcf_reader(file_path).await;
256+
let reader = get_remote_vcf_reader(file_path, chunk_size, concurrent_fetches).await;
256257
VcfRemoteReader::PLAIN(reader)
257258
}
258259
}

0 commit comments

Comments
 (0)