Skip to content

Commit 290df12

Browse files
committed
Fixing local vcf reading with no compression
1 parent 6797e9a commit 290df12

File tree

3 files changed

+111
-49
lines changed

3 files changed

+111
-49
lines changed

datafusion/vcf/examples/datafusion_integration.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,21 @@ async fn main() -> datafusion::error::Result<()> {
1010
// let path = "gs://genomics-public-data/platinum-genomes/vcf/NA12878_S1.genome.vcf".to_string();
1111
// let path ="/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
1212
let path ="/tmp/gnomad.v4.1.sv.sites.vcf.gz".to_string();
13+
// let path ="/tmp//NA12878_S1.genome.vcf".to_string();
1314
// let infos = Some(Vec::from(["AC".to_string(), "AF".to_string(), "AN".to_string(), "FS".to_string(), "AN_raw".to_string(), "variant_type".to_string(), "AS_culprit".to_string(), "only_het".to_string()]));
14-
let infos = Some(Vec::from(["SVTYPE".to_string()]));
15-
// let infos = Some(Vec::from(["AF".to_string()]));
16-
// let infos = None;
15+
// let infos = Some(Vec::from(["SVTYPE".to_string()]));
16+
// let infos = Some(Vec::from(["AF".to_string()])); 50722005
17+
let infos = None;
1718
// Create a new context with the default configuration
1819
let ctx = SessionContext::new();
1920
ctx.sql("set datafusion.execution.skip_physical_aggregate_schema_check=true").await?;
2021
// let table_provider = VcfTableProvider::new("/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".parse().unwrap(), vec!["SVTYPE".parse().unwrap()], vec![], Some(8))?;
2122
let table_provider = VcfTableProvider::new(path, infos, None, Some(4), None,None)?;
2223
ctx.register_table("custom_table", Arc::new(table_provider)).expect("TODO: panic message");
2324
// let df = ctx.sql("SELECT svtype, count(*) as cnt FROM custom_table group by svtype").await?;
24-
// let df = ctx.sql("SELECT count(*) as cnt FROM custom_table").await?;
25+
let df = ctx.sql("SELECT count(*) as cnt FROM custom_table").await?;
2526
// df.clone().write_csv("/tmp/gnomad.exomes.v4.1.sites.chr21-old.csv", DataFrameWriteOptions::default(), Some(CsvOptions::default())).await?;
26-
let df = ctx.sql("SELECT chrom FROM custom_table LIMIT 5").await?;
27+
// let df = ctx.sql("SELECT chrom FROM custom_table LIMIT 5").await?;
2728
// println!("{:?}", df.explain(false, false)?);
2829
df.show().await.expect("TODO: panic message");
2930
// println!("{:?}", );

datafusion/vcf/src/physical_exec.rs

Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use noodles::vcf::header::Infos;
2020
use noodles::vcf::variant::Record;
2121
use noodles::vcf::variant::record::{AlternateBases, Filters, Ids, ReferenceBases};
2222
use noodles::vcf::variant::record::info::field::{Value, value::Array as ValueArray};
23-
use crate::storage::{get_local_vcf_bgzf_reader, get_storage_type, StorageType, VcfRemoteReader};
23+
use crate::storage::{get_local_vcf_bgzf_reader, get_storage_type, StorageType, VcfLocalReader, VcfRemoteReader};
2424
use crate::table_provider::{info_to_arrow_type, OptionalField};
2525

2626
fn build_record_batch(
@@ -184,64 +184,81 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef,
184184
let schema = Arc::clone(&schema_ref);
185185
let file_path = file_path.clone();
186186
let thread_num = thread_num.unwrap_or(1);
187-
let mut reader = get_local_vcf_bgzf_reader(file_path, thread_num)?;
188-
let header = reader.read_header()?;
187+
let mut reader = VcfLocalReader::new(file_path.clone(), thread_num).await;
188+
let header = reader.read_header().await?;
189189
let infos = header.infos();
190-
190+
let mut record_num = 0;
191191
let mut info_builders: (Vec<String>, Vec<DataType>, Vec<OptionalField>) = (Vec::new(), Vec::new(), Vec::new());
192192
set_info_builders(batch_size, info_fields, &infos, &mut info_builders);
193193

194-
let iter = std::iter::from_fn(move || {
194+
let stream = try_stream! {
195195

196-
let mut records = reader.records();
196+
let mut records = reader.read_records();
197197
let iter_start_time = Instant::now();
198-
while count < batch_size {
199-
let record = records.next();
200-
if record.is_none() {
201-
break;
202-
}
203-
let record = record.unwrap().unwrap();
204-
// For each record, fill the fixed columns.
198+
while let Some(result) = records.next().await {
199+
let record = result?; // propagate errors if any
205200
chroms.push(record.reference_sequence_name().to_string());
206-
poss.push(record.variant_start().unwrap().unwrap().get() as u32);
201+
poss.push(record.variant_start().unwrap()?.get() as u32);
207202
pose.push(get_variant_end(&record, &header));
208203
ids.push(record.ids().iter().map(|v| v.to_string()).collect::<Vec<String>>().join(";"));
209204
refs.push(record.reference_bases().to_string());
210205
alts.push(record.alternate_bases().iter().map(|v| v.unwrap_or(".").to_string()).collect::<Vec<String>>().join("|"));
211206
quals.push(record.quality_score().unwrap_or(Ok(0.0)).unwrap() as f64);
212207
filters.push(record.filters().iter(&header).map(|v| v.unwrap_or(".").to_string()).collect::<Vec<String>>().join(";"));
213208
load_infos(Box::new(record), &header, &mut info_builders);
214-
count += 1;
209+
record_num += 1;
210+
// Once the batch size is reached, build and yield a record batch.
211+
if record_num % batch_size == 0 {
212+
debug!("Record number: {}", record_num);
213+
let batch = build_record_batch(
214+
Arc::clone(&schema.clone()),
215+
&chroms,
216+
&poss,
217+
&pose,
218+
&ids,
219+
&refs,
220+
&alts,
221+
&quals,
222+
&filters,
223+
Some(&builders_to_arrays(&mut info_builders.2)), projection.clone(),
224+
// if infos.is_empty() { None } else { Some(&infos) },
225+
226+
)?;
227+
batch_num += 1;
228+
debug!("Batch number: {}", batch_num);
229+
yield batch;
230+
// Clear vectors for the next batch.
231+
chroms.clear();
232+
poss.clear();
233+
pose.clear();
234+
ids.clear();
235+
refs.clear();
236+
alts.clear();
237+
quals.clear();
238+
filters.clear();
239+
240+
}
215241
}
216-
if count == 0 {
217-
return None;
242+
// If there are remaining records that don't fill a complete batch,
243+
// yield them as well.
244+
if !chroms.is_empty() {
245+
let batch = build_record_batch(
246+
Arc::clone(&schema.clone()),
247+
&chroms,
248+
&poss,
249+
&pose,
250+
&ids,
251+
&refs,
252+
&alts,
253+
&quals,
254+
&filters,
255+
Some(&builders_to_arrays(&mut info_builders.2)), projection.clone(),
256+
// if infos.is_empty() { None } else { Some(&infos) },
257+
)?;
258+
yield batch;
218259
}
219-
let duration = iter_start_time.elapsed();
220-
debug!("Time elapsed in iterating records: {:?}", duration);
221-
debug!("Batch number: {}", batch_num);
222-
let start_time = Instant::now();
223-
let batch = build_record_batch(Arc::clone(&schema),
224-
&chroms, &poss, &pose,
225-
&ids, &refs, &alts,
226-
&quals, &filters,
227-
Some(&builders_to_arrays(&mut info_builders.2)),
228-
projection.clone()
229-
).unwrap();
230-
let duration = start_time.elapsed();
231-
debug!("Time elapsed in building batch: {:?}", duration);
232-
count = 0;
233-
chroms.clear();
234-
poss.clear();
235-
pose.clear();
236-
ids.clear();
237-
refs.clear();
238-
alts.clear();
239-
quals.clear();
240-
filters.clear();
241-
batch_num += 1;
242-
Some(Ok(batch))
243-
});
244-
Ok(stream::iter(iter))
260+
};
261+
Ok(stream)
245262
}
246263

247264

datafusion/vcf/src/storage.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use std::fs::File;
22
use std::io::Error;
33
use std::num::NonZero;
4+
use async_stream::stream;
45
use bytes::Bytes;
5-
use futures::StreamExt;
6+
use futures::{stream, StreamExt};
67
use futures::stream::BoxStream;
78
use log::debug;
89
use noodles::{bgzf, vcf};
@@ -244,6 +245,8 @@ pub enum VcfRemoteReader {
244245
PLAIN( vcf::r#async::io::Reader<StreamReader<FuturesBytesStream, Bytes>>)
245246
}
246247

248+
249+
247250
impl VcfRemoteReader {
248251
pub async fn new(file_path: String, chunk_size: usize, concurrent_fetches: usize) -> Self {
249252
let compression_type = get_compression_type(file_path.clone());
@@ -281,3 +284,44 @@ impl VcfRemoteReader {
281284
}
282285
}
283286

287+
pub enum VcfLocalReader {
288+
BGZF(Reader<MultithreadedReader<File>>),
289+
PLAIN(vcf::r#async::io::Reader<BufReader<tokio::fs::File>>)
290+
}
291+
292+
impl VcfLocalReader {
293+
pub async fn new(file_path: String, thread_num: usize) -> Self {
294+
let compression_type = get_compression_type(file_path.clone());
295+
match compression_type {
296+
CompressionType::BGZF | CompressionType::GZIP=> {
297+
let reader = get_local_vcf_bgzf_reader(file_path, thread_num).unwrap();
298+
VcfLocalReader::BGZF(reader)
299+
}
300+
CompressionType::NONE => {
301+
let reader = get_local_vcf_reader(file_path).await.unwrap();
302+
VcfLocalReader::PLAIN(reader)
303+
}
304+
}
305+
}
306+
pub async fn read_header(&mut self) -> Result<vcf::Header, Error> {
307+
match self {
308+
VcfLocalReader::BGZF(reader) => {
309+
reader.read_header()
310+
}
311+
VcfLocalReader::PLAIN(reader) => {
312+
reader.read_header().await
313+
}
314+
}
315+
}
316+
317+
pub fn read_records(&mut self) -> BoxStream<'_, Result<Record, Error>> {
318+
match self {
319+
VcfLocalReader::BGZF(reader) => {
320+
stream::iter(reader.records()).boxed()
321+
}
322+
VcfLocalReader::PLAIN(reader) => {
323+
reader.records().boxed()
324+
}
325+
}
326+
}
327+
}

0 commit comments

Comments
 (0)