Skip to content

Commit 95e6854

Browse files
committed
Adding support for remote reading of uncompressed VCFs
1 parent 8603e6e commit 95e6854

File tree

7 files changed

+116
-24
lines changed

7 files changed

+116
-24
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/vcf/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ noodles = { version = "0.91.0", features = ["bam", "sam", "vcf", "bgzf", "async
1414
noodles-bgzf = { version = "0.36.0" ,features = ["libdeflate"] }
1515
tokio = { version = "1.43.0", features = ["rt-multi-thread", "rt", "macros"] }
1616
object_store = {version = "0.11.2", features = ["gcp"] }
17-
tokio-util = { version="0.7.13", features =["io-util"] }
17+
tokio-util = { version="0.7.13", features = ["io-util", "compat"] }
1818
bytes = "1.10.0"
1919
opendal = { version = "0.51.2", features = ["services-gcs", "services-s3","layers-blocking"] }
2020
log = "0.4.22"

datafusion/vcf/examples/bgzf.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,6 @@ use noodles::{bgzf, vcf};
77
use noodles::vcf::io::Reader;
88
use noodles_bgzf::MultithreadedReader;
99

10-
pub fn get_local_vcf_reader(file_path: String, thread_num: usize) -> Result<Reader<MultithreadedReader<File>>, Error> {
11-
debug!("Reading VCF file from local storage with {} threads", thread_num);
12-
File::open(file_path)
13-
.map(|f| noodles_bgzf::MultithreadedReader::with_worker_count(NonZero::new(thread_num).unwrap(), f))
14-
.map(vcf::io::Reader::new)
15-
}
1610

1711
#[tokio::main(flavor = "multi_thread")]
1812
async fn main() -> datafusion::error::Result<()> {

datafusion/vcf/examples/datafusion_integration.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ async fn main() -> datafusion::error::Result<()> {
77
env_logger::init();
88
// let path = "gs://gcp-public-data--gnomad/release/4.1/vcf/exomes/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
99
// let path = "gs://gcp-public-data--gnomad/release/4.1/genome_sv/gnomad.v4.1.sv.sites.vcf.gz".to_string();
10-
let path ="/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
10+
let path = "gs://genomics-public-data/platinum-genomes/vcf/NA12878_S1.genome.vcf".to_string();
11+
// let path ="/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
1112
// let path ="/tmp/gnomad.v4.1.sv.sites.vcf.gz".to_string();
1213
// let infos = Some(Vec::from(["AC".to_string(), "AF".to_string(), "AN".to_string(), "FS".to_string(), "AN_raw".to_string(), "variant_type".to_string(), "AS_culprit".to_string(), "only_het".to_string()]));
1314
// let infos = Some(Vec::from(["SVTYPE".to_string()]));

datafusion/vcf/src/physical_exec.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use noodles::vcf::variant::Record;
3232
use noodles::vcf::variant::record::{AlternateBases, Filters, Ids};
3333
use noodles::vcf::variant::record::info::field::{Value, value::Array as ValueArray};
3434
use noodles_bgzf::MultithreadedReader;
35-
use crate::storage::{get_local_vcf_reader, get_remote_stream_bgzf, get_remote_vcf_reader, get_storage_type, StorageType};
35+
use crate::storage::{get_compression_type, get_local_vcf_bgzf_reader, get_remote_stream_bgzf, get_remote_vcf_bgzf_reader, get_remote_vcf_header, get_remote_vcf_reader, get_storage_type, CompressionType, StorageType, VcfRemoteReader};
3636
use crate::table_provider::{info_to_arrow_type, OptionalField};
3737

3838
fn build_record_batch(
@@ -148,7 +148,7 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef, batch_size: usi
148148
let schema = Arc::clone(&schema_ref);
149149
let file_path = file_path.clone();
150150
let thread_num = thread_num.unwrap_or(1);
151-
let mut reader = get_local_vcf_reader(file_path, thread_num)?;
151+
let mut reader = get_local_vcf_bgzf_reader(file_path, thread_num)?;
152152
let header = reader.read_header()?;
153153
let infos = header.infos();
154154

@@ -206,12 +206,14 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef, batch_size: usi
206206

207207

208208
async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef, batch_size: usize, info_fields: Option<Vec<String>>) -> datafusion::error::Result<AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output=()> + Sized>> {
209-
let mut reader = get_remote_vcf_reader(file_path.clone()).await;
209+
let mut reader = VcfRemoteReader::new(file_path.clone()).await;
210210
let header = reader.read_header().await?;
211211
let infos = header.infos();
212212
let mut info_builders: (Vec<String>, Vec<DataType>, Vec<OptionalField>) = (Vec::new(), Vec::new(), Vec::new());
213213
set_info_builders(batch_size, info_fields, &infos, &mut info_builders);
214214

215+
216+
215217
let stream = try_stream! {
216218
// Create vectors for accumulating record data.
217219
let mut chroms: Vec<String> = Vec::with_capacity(batch_size);
@@ -231,7 +233,8 @@ async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef, batch_size:
231233

232234

233235
// Process records one by one.
234-
let mut records = reader.records();
236+
237+
let mut records = reader.read_records().await;
235238
while let Some(result) = records.next().await {
236239
let record = result?; // propagate errors if any
237240
chroms.push(record.reference_sequence_name().to_string());
@@ -244,9 +247,6 @@ async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef, batch_size:
244247
filters.push(record.filters().iter(&header).map(|v| v.unwrap_or(".").to_string()).collect::<Vec<String>>().join(";"));
245248
load_infos(Box::new(record), &header, &mut info_builders);
246249
record_num += 1;
247-
248-
249-
250250
// Once the batch size is reached, build and yield a record batch.
251251
if record_num % batch_size == 0 {
252252
debug!("Record number: {}", record_num);

datafusion/vcf/src/storage.rs

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
use std::fs::File;
2+
use std::io;
23
use std::io::Error;
34
use std::num::NonZero;
45
use std::sync::Arc;
6+
use bytes::Bytes;
7+
use futures::{Stream, StreamExt};
8+
use futures::stream::BoxStream;
59
use log::debug;
610
use noodles::{bgzf, vcf};
711
use noodles::vcf::io::Reader;
12+
use noodles::vcf::Record;
813
use noodles_bgzf::{AsyncReader, MultithreadedReader};
914
use opendal::{FuturesBytesStream, Operator};
1015
use opendal::layers::{LoggingLayer, RetryLayer, TimeoutLayer};
1116
use opendal::services::{Gcs, S3};
17+
use tokio::io::{AsyncRead, BufReader};
1218
use tokio_util::io::StreamReader;
1319

1420

@@ -28,7 +34,7 @@ impl CompressionType {
2834
}
2935

3036
fn from_string(compression_type: String) -> Self {
31-
match compression_type.as_str() {
37+
match compression_type.to_lowercase().as_str() {
3238
"gz" => CompressionType::GZIP,
3339
"bgz" => CompressionType::BGZF,
3440
"none" => CompressionType::NONE,
@@ -79,6 +85,9 @@ fn get_file_path(file_path: String) -> String {
7985

8086
pub fn get_compression_type(file_path: String) -> CompressionType {
8187
//extract the file extension from path
88+
if file_path.to_lowercase().ends_with(".vcf") {
89+
return CompressionType::NONE;
90+
}
8291
let file_extension = file_path.split('.').last().unwrap();
8392
//return the compression type
8493
CompressionType::from_string(file_extension.to_string())
@@ -153,20 +162,108 @@ pub async fn get_remote_stream(file_path: String) -> Result<FuturesBytesStream,
153162
}
154163
}
155164

156-
pub async fn get_remote_vcf_reader(file_path: String) -> vcf::r#async::io::Reader<AsyncReader<StreamReader<FuturesBytesStream, bytes::Bytes>>> {
165+
pub async fn get_remote_vcf_bgzf_reader(file_path: String) -> vcf::r#async::io::Reader<AsyncReader<StreamReader<FuturesBytesStream, Bytes>>> {
157166
let inner = get_remote_stream_bgzf(file_path.clone()).await.unwrap();
158167
let mut reader = vcf::r#async::io::Reader::new(inner);
159168
reader
160169
}
161170

171+
pub async fn get_remote_vcf_reader(file_path: String) -> vcf::r#async::io::Reader<StreamReader<FuturesBytesStream, Bytes>> {
172+
let inner = StreamReader::new(get_remote_stream(file_path.clone()).await.unwrap());
173+
let mut reader = vcf::r#async::io::Reader::new(inner);
174+
reader
175+
}
176+
177+
162178

163-
pub fn get_local_vcf_reader(file_path: String, thread_num: usize) -> Result<Reader<MultithreadedReader<File>>, Error> {
179+
180+
pub fn get_local_vcf_bgzf_reader(file_path: String, thread_num: usize) -> Result<Reader<MultithreadedReader<File>>, Error> {
164181
debug!("Reading VCF file from local storage with {} threads", thread_num);
165182
File::open(file_path)
166183
.map(|f| noodles_bgzf::MultithreadedReader::with_worker_count(NonZero::new(thread_num).unwrap(), f))
167184
.map(vcf::io::Reader::new)
168185
}
169186

170187

188+
pub async fn get_local_vcf_reader(file_path: String) -> Result<vcf::r#async::io::Reader<BufReader<tokio::fs::File>>, Error> {
189+
debug!("Reading VCF file from local storage with async reader");
190+
let reader = tokio::fs::File::open("sample.vcf")
191+
.await
192+
.map(BufReader::new)
193+
.map(vcf::r#async::io::Reader::new)?;
194+
Ok(reader)
195+
}
196+
197+
198+
pub async fn get_local_vcf_header(file_path: String, thread_num: usize) -> Result<vcf::Header, Error> {
199+
let compression_type = get_compression_type(file_path.clone());
200+
let header = match compression_type {
201+
CompressionType::BGZF | CompressionType::GZIP=> {
202+
let mut reader = get_local_vcf_bgzf_reader(file_path, thread_num)?;
203+
reader.read_header()?
204+
}
205+
CompressionType::NONE => {
206+
let mut reader = get_local_vcf_reader(file_path).await?;
207+
reader.read_header().await?
208+
}
209+
};
210+
Ok(header)
211+
}
171212

213+
pub async fn get_remote_vcf_header(file_path: String) -> Result<vcf::Header, Error> {
214+
let compression_type = get_compression_type(file_path.clone());
215+
let header = match compression_type {
216+
CompressionType::BGZF | CompressionType::GZIP=> {
217+
let mut reader = get_remote_vcf_bgzf_reader(file_path).await;
218+
reader.read_header().await?
219+
}
220+
CompressionType::NONE => {
221+
let mut reader = get_remote_vcf_reader(file_path).await;
222+
reader.read_header().await?
223+
}
224+
};
225+
Ok(header)
226+
}
227+
228+
pub enum VcfRemoteReader {
229+
BGZF( vcf::r#async::io::Reader<AsyncReader<StreamReader<FuturesBytesStream, Bytes>>>),
230+
PLAIN( vcf::r#async::io::Reader<StreamReader<FuturesBytesStream, Bytes>>)
231+
}
232+
233+
impl VcfRemoteReader {
234+
pub async fn new(file_path: String) -> Self {
235+
let compression_type = get_compression_type(file_path.clone());
236+
match compression_type {
237+
CompressionType::BGZF | CompressionType::GZIP=> {
238+
let reader = get_remote_vcf_bgzf_reader(file_path).await;
239+
VcfRemoteReader::BGZF(reader)
240+
}
241+
CompressionType::NONE => {
242+
let reader = get_remote_vcf_reader(file_path).await;
243+
VcfRemoteReader::PLAIN(reader)
244+
}
245+
}
246+
}
247+
pub async fn read_header(&mut self) -> Result<vcf::Header, Error> {
248+
match self {
249+
VcfRemoteReader::BGZF(reader) => {
250+
reader.read_header().await
251+
}
252+
VcfRemoteReader::PLAIN(reader) => {
253+
reader.read_header().await
254+
}
255+
}
256+
}
257+
258+
pub async fn read_records(&mut self) -> BoxStream<'_, Result<Record, Error>> {
259+
match self {
260+
VcfRemoteReader::BGZF(reader) => {
261+
reader.records().boxed()
262+
}
263+
VcfRemoteReader::PLAIN(reader) => {
264+
reader.records().boxed()
265+
}
266+
}
267+
}
268+
}
172269

datafusion/vcf/src/table_provider.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use noodles::vcf::Header;
1717
use noodles::vcf::header::Infos;
1818
use noodles::vcf::header::record::value::map::info::{Number, Type};
1919
use crate::physical_exec::VcfExec;
20-
use crate::storage::{get_local_vcf_reader, get_remote_vcf_reader, get_storage_type, StorageType};
20+
use crate::storage::{get_local_vcf_bgzf_reader, get_local_vcf_header, get_remote_vcf_bgzf_reader, get_remote_vcf_header, get_storage_type, StorageType};
2121

2222
async fn determine_schema_from_header(
2323
file_path: &str,
@@ -26,10 +26,7 @@ async fn determine_schema_from_header(
2626
) -> datafusion::common::Result<SchemaRef> {
2727

2828
let storage_type = get_storage_type(file_path.to_string());
29-
let header = match storage_type {
30-
StorageType::LOCAL => get_local_vcf_reader(file_path.to_string(), 1)?.read_header()?,
31-
_ => get_remote_vcf_reader(file_path.to_string()).await.read_header().await?
32-
};
29+
let header = get_remote_vcf_header(file_path.to_string()).await?;
3330
let header_infos = header.infos();
3431

3532
let mut fields = vec![
@@ -64,6 +61,7 @@ pub struct VcfTableProvider {
6461
format_fields: Option<Vec<String>>,
6562
schema: SchemaRef,
6663
thread_num: Option<usize>,
64+
6765
}
6866

6967
impl VcfTableProvider {
@@ -73,7 +71,7 @@ impl VcfTableProvider {
7371
format_fields: Option<Vec<String>>,
7472
thread_num: Option<usize>,
7573
) -> datafusion::common::Result<Self> {
76-
let schema = block_on(determine_schema_from_header(&file_path, &info_fields, &format_fields)).unwrap();
74+
let schema = block_on(determine_schema_from_header(&file_path, &info_fields, &format_fields))?;
7775
Ok(Self {
7876
file_path,
7977
info_fields,
@@ -101,6 +99,7 @@ impl TableProvider for VcfTableProvider {
10199
}
102100

103101
async fn scan(&self, state: &dyn Session, projection: Option<&Vec<usize>>, _filters: &[Expr], limit: Option<usize>) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
102+
104103
Ok(Arc::new(VcfExec {
105104
cache: PlanProperties::new( EquivalenceProperties::new(self.schema.clone()),
106105
Partitioning::UnknownPartitioning(1),

0 commit comments

Comments
 (0)