Skip to content

Commit a1380de

Browse files
committed
Enabling projection
1 parent 7c21773 commit a1380de

File tree

3 files changed

+99
-22
lines changed

3 files changed

+99
-22
lines changed

datafusion/vcf/examples/datafusion_integration.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use std::sync::Arc;
2+
use datafusion::config::CsvOptions;
3+
use datafusion::dataframe::DataFrameWriteOptions;
24
use datafusion::prelude::SessionContext;
35
use datafusion_vcf::table_provider::VcfTableProvider;
46

@@ -11,17 +13,19 @@ async fn main() -> datafusion::error::Result<()> {
1113
// let path ="/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
1214
let path ="/tmp/gnomad.v4.1.sv.sites.vcf.gz".to_string();
1315
// let infos = Some(Vec::from(["AC".to_string(), "AF".to_string(), "AN".to_string(), "FS".to_string(), "AN_raw".to_string(), "variant_type".to_string(), "AS_culprit".to_string(), "only_het".to_string()]));
14-
// let infos = Some(Vec::from(["SVTYPE".to_string()]));
16+
let infos = Some(Vec::from(["SVTYPE".to_string()]));
1517
// let infos = Some(Vec::from(["AF".to_string()]));
16-
let infos = None;
18+
// let infos = None;
1719
// Create a new context with the default configuration
1820
let ctx = SessionContext::new();
1921
ctx.sql("set datafusion.execution.skip_physical_aggregate_schema_check=true").await?;
2022
// let table_provider = VcfTableProvider::new("/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".parse().unwrap(), vec!["SVTYPE".parse().unwrap()], vec![], Some(8))?;
21-
let table_provider = VcfTableProvider::new(path, infos, None, Some(1))?;
23+
let table_provider = VcfTableProvider::new(path, infos, None, Some(4))?;
2224
ctx.register_table("custom_table", Arc::new(table_provider)).expect("TODO: panic message");
23-
// let df = ctx.sql("SELECT count(*) FROM custom_table").await?;
24-
let df = ctx.sql("SELECT * FROM custom_table LIMIT 5").await?;
25+
// let df = ctx.sql("SELECT svtype, count(*) as cnt FROM custom_table group by svtype").await?;
26+
let df = ctx.sql("SELECT count(*) as cnt FROM custom_table").await?;
27+
// df.clone().write_csv("/tmp/gnomad.exomes.v4.1.sites.chr21-old.csv", DataFrameWriteOptions::default(), Some(CsvOptions::default())).await?;
28+
// let df = ctx.sql("SELECT * FROM custom_table LIMIT 5").await?;
2529
// println!("{:?}", df.explain(false, false)?);
2630
df.show().await.expect("TODO: panic message");
2731
// println!("{:?}", );

datafusion/vcf/src/physical_exec.rs

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::any::Any;
22
use std::fmt::{Debug, Formatter};
33
use std::sync::Arc;
44

5-
use datafusion::arrow::array::{Array, Float64Array, Int32Array, StringArray, UInt32Array};
6-
use datafusion::arrow::datatypes::{DataType, SchemaRef};
5+
use datafusion::arrow::array::{Array, Float64Array, Int32Array, NullArray, StringArray, UInt32Array};
6+
use datafusion::arrow::datatypes::{DataType, Field, SchemaRef};
77
use datafusion::arrow::record_batch::RecordBatch;
88
use datafusion::common::DataFusionError;
99
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
@@ -46,6 +46,7 @@ fn build_record_batch(
4646
quals: &[f64],
4747
filters: &[String],
4848
infos: Option<&Vec<Arc<dyn Array>>>,
49+
projection: Option<Vec<usize>>,
4950

5051
) -> datafusion::error::Result<RecordBatch> {
5152
let chrom_array = Arc::new(StringArray::from(chroms.to_vec())) as Arc<dyn Array>;
@@ -56,11 +57,39 @@ fn build_record_batch(
5657
let alt_array = Arc::new(StringArray::from(alts.to_vec())) as Arc<dyn Array>;
5758
let qual_array = Arc::new(Float64Array::from(quals.to_vec())) as Arc<dyn Array>;
5859
let filter_array = Arc::new(StringArray::from(filters.to_vec())) as Arc<dyn Array>;
60+
let arrays = match projection {
61+
None => {
62+
let mut arrays: Vec<Arc<dyn Array>> = vec![
63+
chrom_array, pos_start_array, pos_end_array, id_array, ref_array, alt_array, qual_array, filter_array,
64+
];
65+
arrays.append(&mut infos.unwrap().clone());
66+
arrays
5967

60-
let mut arrays: Vec<Arc<dyn Array>> = vec![
61-
chrom_array, pos_start_array, pos_end_array, id_array, ref_array, alt_array, qual_array, filter_array,
62-
];
63-
arrays.append(&mut infos.unwrap().clone());
68+
}
69+
Some(proj_ids) => {
70+
let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(ids.len());
71+
if proj_ids.is_empty() {
72+
debug!("Empty projection creating a dummy field");
73+
arrays.push(Arc::new(NullArray::new(chrom_array.len())) as Arc<dyn Array>);
74+
}
75+
else {
76+
for i in proj_ids.clone() {
77+
match i {
78+
0 => arrays.push(chrom_array.clone()),
79+
1 => arrays.push(pos_start_array.clone()),
80+
2 => arrays.push(pos_end_array.clone()),
81+
3 => arrays.push(id_array.clone()),
82+
4 => arrays.push(ref_array.clone()),
83+
5 => arrays.push(alt_array.clone()),
84+
6 => arrays.push(qual_array.clone()),
85+
7 => arrays.push(filter_array.clone()),
86+
_ => arrays.push(infos.unwrap()[i-8].clone())
87+
}
88+
}
89+
}
90+
arrays
91+
}
92+
};
6493
RecordBatch::try_new(schema.clone(), arrays)
6594
.map_err(|e| DataFusionError::Execution(format!("Error creating batch: {:?}", e)))
6695
}
@@ -150,7 +179,9 @@ fn get_variant_end(record: &dyn Record, header: &Header) -> u32 {
150179

151180

152181

153-
async fn get_local_vcf(file_path: String, schema_ref: SchemaRef, batch_size: usize, thread_num: Option<usize>, info_fields: Option<Vec<String>>) -> datafusion::error::Result<impl futures::Stream<Item = datafusion::error::Result<RecordBatch>>> {
182+
async fn get_local_vcf(file_path: String, schema_ref: SchemaRef,
183+
batch_size: usize, thread_num: Option<usize>,
184+
info_fields: Option<Vec<String>>, projection: Option<Vec<usize>>) -> datafusion::error::Result<impl futures::Stream<Item = datafusion::error::Result<RecordBatch>>> {
154185
let mut chroms: Vec<String> = Vec::with_capacity(batch_size);
155186
let mut poss: Vec<u32> = Vec::with_capacity(batch_size);
156187
let mut pose: Vec<u32> = Vec::with_capacity(batch_size);
@@ -203,7 +234,13 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef, batch_size: usi
203234
debug!("Time elapsed in iterating records: {:?}", duration);
204235
debug!("Batch number: {}", batch_num);
205236
let start_time = Instant::now();
206-
let batch = build_record_batch(Arc::clone(&schema), &chroms, &poss, &pose, &ids, &refs, &alts, &quals, &filters, Some(&builders_to_arrays(&mut info_builders.2))).unwrap();
237+
let batch = build_record_batch(Arc::clone(&schema),
238+
&chroms, &poss, &pose,
239+
&ids, &refs, &alts,
240+
&quals, &filters,
241+
Some(&builders_to_arrays(&mut info_builders.2)),
242+
projection.clone()
243+
).unwrap();
207244
let duration = start_time.elapsed();
208245
debug!("Time elapsed in building batch: {:?}", duration);
209246
count = 0;
@@ -223,7 +260,9 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef, batch_size: usi
223260

224261

225262

226-
async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef, batch_size: usize, info_fields: Option<Vec<String>>) -> datafusion::error::Result<AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output=()> + Sized>> {
263+
async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef,
264+
batch_size: usize,
265+
info_fields: Option<Vec<String>>, projection: Option<Vec<usize>>) -> datafusion::error::Result<AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output=()> + Sized>> {
227266
let mut reader = VcfRemoteReader::new(file_path.clone()).await;
228267
let header = reader.read_header().await?;
229268
let infos = header.infos();
@@ -278,7 +317,7 @@ async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef, batch_size:
278317
&alts,
279318
&quals,
280319
&filters,
281-
Some(&builders_to_arrays(&mut info_builders.2)),
320+
Some(&builders_to_arrays(&mut info_builders.2)), projection.clone(),
282321
// if infos.is_empty() { None } else { Some(&infos) },
283322

284323
)?;
@@ -310,7 +349,7 @@ async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef, batch_size:
310349
&alts,
311350
&quals,
312351
&filters,
313-
Some(&builders_to_arrays(&mut info_builders.2))
352+
Some(&builders_to_arrays(&mut info_builders.2)), projection.clone(),
314353
// if infos.is_empty() { None } else { Some(&infos) },
315354
)?;
316355
yield batch;
@@ -329,7 +368,9 @@ fn set_info_builders(batch_size: usize, info_fields: Option<Vec<String>>, infos:
329368
}
330369
}
331370

332-
async fn get_stream(file_path: String, schema_ref: SchemaRef, batch_size: usize, thread_num: Option<usize>, info_fields: Option<Vec<String>>) -> datafusion::error::Result<SendableRecordBatchStream> {
371+
async fn get_stream(file_path: String, schema_ref: SchemaRef, batch_size: usize,
372+
thread_num: Option<usize>,
373+
info_fields: Option<Vec<String>>, projection: Option<Vec<usize>>) -> datafusion::error::Result<SendableRecordBatchStream> {
333374
// Open the BGZF-indexed VCF using IndexedReader.
334375

335376
let file_path = file_path.clone();
@@ -338,11 +379,11 @@ async fn get_stream(file_path: String, schema_ref: SchemaRef, batch_size: usize,
338379

339380
match store_type {
340381
StorageType::LOCAL => {
341-
let stream = get_local_vcf(file_path.clone(), schema.clone(), batch_size, thread_num, info_fields).await?;
382+
let stream = get_local_vcf(file_path.clone(), schema.clone(), batch_size, thread_num, info_fields, projection).await?;
342383
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))
343384
},
344385
StorageType::GCS | StorageType::S3=> {
345-
let stream = get_remote_vcf_stream(file_path.clone(), schema.clone(), batch_size, info_fields).await?;
386+
let stream = get_remote_vcf_stream(file_path.clone(), schema.clone(), batch_size, info_fields, projection).await?;
346387
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))
347388
},
348389
_ => panic!("Unsupported storage type")
@@ -375,6 +416,7 @@ impl VcfExec {
375416
limit: Option<usize>,
376417
thread_num: Option<usize>,
377418
) -> Self {
419+
debug!("VcfExec::new");
378420
Self {
379421
file_path: file_path.clone(),
380422
schema,
@@ -429,9 +471,15 @@ impl ExecutionPlan for VcfExec {
429471

430472

431473
fn execute(&self, _partition: usize, context: Arc<TaskContext>) -> datafusion::common::Result<SendableRecordBatchStream> {
474+
debug!("VcfExec::execute");
475+
debug!("Projection: {:?}", self.projection);
432476
let batch_size = context.session_config().batch_size();
433477
let schema = self.schema.clone();
434-
let fut = get_stream(self.file_path.clone(), schema.clone(), batch_size, self.thread_num, self.info_fields.clone());
478+
let fut = get_stream(self.file_path.clone(),
479+
schema.clone(),
480+
batch_size, self.thread_num,
481+
self.info_fields.clone(),
482+
self.projection.clone());
435483
let stream = futures::stream::once(fut).try_flatten();
436484
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
437485

datafusion/vcf/src/table_provider.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::any::Any;
22
use std::fmt::{Debug, Formatter};
3+
use std::ops::Deref;
34
use std::ptr::null;
45
use std::sync::{Arc, Mutex};
56
use async_trait::async_trait;
@@ -90,6 +91,7 @@ impl TableProvider for VcfTableProvider {
9091
}
9192

9293
fn schema(&self) -> SchemaRef {
94+
debug!("VcfTableProvider::schema");
9395
self.schema.clone()
9496
}
9597

@@ -98,13 +100,36 @@ impl TableProvider for VcfTableProvider {
98100
}
99101

100102
async fn scan(&self, state: &dyn Session, projection: Option<&Vec<usize>>, _filters: &[Expr], limit: Option<usize>) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
103+
debug!("VcfTableProvider::scan");
104+
105+
let schema = match projection {
106+
Some(p) => {
107+
if p.len() == 0 {
108+
Arc::new(Schema::new(vec![
109+
Field::new("dummy", DataType::Null, true),
110+
]))
111+
}
112+
else {
113+
let schema_fields = self.schema.fields();
114+
let proj = projection.unwrap().clone();
115+
let mut fields: Vec<Field> = Vec::with_capacity(proj.len());
116+
for i in proj {
117+
fields.push(schema_fields[i].deref().clone());
118+
}
119+
Arc::new(Schema::new(fields))
120+
}
121+
},
122+
None => {
123+
self.schema.clone()
124+
}
125+
};
101126

102127
Ok(Arc::new(VcfExec {
103-
cache: PlanProperties::new( EquivalenceProperties::new(self.schema.clone()),
128+
cache: PlanProperties::new( EquivalenceProperties::new(schema.clone()),
104129
Partitioning::UnknownPartitioning(1),
105130
ExecutionMode::Bounded),
106131
file_path: self.file_path.clone(),
107-
schema: self.schema.clone(),
132+
schema: schema.clone(),
108133
info_fields: self.info_fields.clone(),
109134
format_fields: self.format_fields.clone(),
110135
projection: projection.cloned(),

0 commit comments

Comments
 (0)