Skip to content

Commit 7c21773

Browse files
committed
Optimize variant_end
1 parent 9e92efd commit 7c21773

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

datafusion/vcf/src/physical_exec.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use noodles::vcf::header::record::value::map::Info;
2929
use noodles::vcf::header::record::value::map::info::{Number, Type};
3030
use noodles::vcf::io::Reader;
3131
use noodles::vcf::variant::Record;
32-
use noodles::vcf::variant::record::{AlternateBases, Filters, Ids};
32+
use noodles::vcf::variant::record::{AlternateBases, Filters, Ids, ReferenceBases};
3333
use noodles::vcf::variant::record::info::field::{Value, value::Array as ValueArray};
3434
use noodles_bgzf::MultithreadedReader;
3535
use crate::storage::{get_compression_type, get_local_vcf_bgzf_reader, get_remote_stream_bgzf, get_remote_vcf_bgzf_reader, get_remote_vcf_header, get_remote_vcf_reader, get_storage_type, CompressionType, StorageType, VcfRemoteReader};
@@ -131,6 +131,24 @@ fn builders_to_arrays(builders: &mut Vec<OptionalField>) -> Vec<Arc<dyn Array>>
131131
builders.iter_mut().map(|f| f.finish()).collect::<Vec<Arc<dyn Array>>>()
132132
}
133133

134+
fn get_variant_end(record: &dyn Record, header: &Header) -> u32 {
135+
let ref_len = record.reference_bases().len();
136+
let alt_len = record.alternate_bases().len();
137+
//check if all are single base ACTG
138+
if ref_len == 1 && alt_len == 1 && record.reference_bases().iter()
139+
.map(|c| c.unwrap())
140+
.all(|c| c== b'A' || c == b'C' || c == b'G' || c == b'T') &&
141+
record.alternate_bases().iter()
142+
.map(|c| c.unwrap())
143+
.all(|c| c.eq("A")|| c.eq("C") || c.eq("G") || c.eq("T") ) {
144+
record.variant_start().unwrap().unwrap().get() as u32
145+
} else {
146+
record.variant_end(&header).unwrap().get() as u32
147+
}
148+
149+
}
150+
151+
134152

135153
async fn get_local_vcf(file_path: String, schema_ref: SchemaRef, batch_size: usize, thread_num: Option<usize>, info_fields: Option<Vec<String>>) -> datafusion::error::Result<impl futures::Stream<Item = datafusion::error::Result<RecordBatch>>> {
136154
let mut chroms: Vec<String> = Vec::with_capacity(batch_size);
@@ -169,7 +187,7 @@ async fn get_local_vcf(file_path: String, schema_ref: SchemaRef, batch_size: usi
169187
// For each record, fill the fixed columns.
170188
chroms.push(record.reference_sequence_name().to_string());
171189
poss.push(record.variant_start().unwrap().unwrap().get() as u32);
172-
pose.push(record.variant_end(&header).unwrap().get() as u32);
190+
pose.push(get_variant_end(&record, &header));
173191
ids.push(record.ids().iter().map(|v| v.to_string()).collect::<Vec<String>>().join(";"));
174192
refs.push(record.reference_bases().to_string());
175193
alts.push(record.alternate_bases().iter().map(|v| v.unwrap_or(".").to_string()).collect::<Vec<String>>().join("|"));
@@ -238,8 +256,8 @@ async fn get_remote_vcf_stream(file_path: String, schema: SchemaRef, batch_size:
238256
while let Some(result) = records.next().await {
239257
let record = result?; // propagate errors if any
240258
chroms.push(record.reference_sequence_name().to_string());
241-
poss.push(record.variant_start().unwrap().unwrap().get() as u32);
242-
pose.push(record.variant_end(&header).unwrap().get() as u32);
259+
poss.push(record.variant_start().unwrap()?.get() as u32);
260+
pose.push(get_variant_end(&record, &header));
243261
ids.push(record.ids().iter().map(|v| v.to_string()).collect::<Vec<String>>().join(";"));
244262
refs.push(record.reference_bases().to_string());
245263
alts.push(record.alternate_bases().iter().map(|v| v.unwrap_or(".").to_string()).collect::<Vec<String>>().join("|"));

0 commit comments

Comments
 (0)