Skip to content

Commit 9beb883

Browse files
mwiewiorclaude
andcommitted
feat: Add coordinate_system_zero_based parameter to all table providers
This commit implements the coordinate system parameter for all bio format table providers (VCF, GFF, BAM, BED, CRAM) to support both 0-based and 1-based coordinate output. Changes: - Add `coordinate_system_zero_based: bool` parameter to all TableProvider constructors - Store coordinate system preference in Arrow schema metadata with key `bio.coordinate_system_zero_based` - Add `COORDINATE_SYSTEM_METADATA_KEY` constant in bio-format-core - Update position conversion in physical_exec.rs for each format: - When true (default): subtract 1 from noodles 1-based positions - When false: use noodles positions as-is (1-based) - Update all binaries, examples, and tests to pass the new parameter - Fix test assertions to expect 0-based coordinates (default) Breaking Change: All TableProvider::new() constructors now require an additional bool parameter as the last argument. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c558616 commit 9beb883

34 files changed

+1005
-74
lines changed

.github/workflows/benchmark.yml.bak

Lines changed: 651 additions & 0 deletions
Large diffs are not rendered by default.

datafusion/bio-format-bam/examples/test_bam_reader.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1414
concurrent_fetches: Some(1), // Number of concurrent requests
1515
compression_type: None,
1616
};
17-
let table =
18-
BamTableProvider::new(file_path.clone(), Some(1), Some(object_storage_options)).unwrap();
17+
let table = BamTableProvider::new(
18+
file_path.clone(),
19+
Some(1),
20+
Some(object_storage_options),
21+
true,
22+
)
23+
.unwrap();
1924

2025
let ctx = datafusion::execution::context::SessionContext::new();
2126
ctx.sql("set datafusion.execution.skip_physical_aggregate_schema_check=true")

datafusion/bio-format-bam/src/physical_exec.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub struct BamExec {
3030
pub(crate) limit: Option<usize>,
3131
pub(crate) thread_num: Option<usize>,
3232
pub(crate) object_storage_options: Option<ObjectStorageOptions>,
33+
/// If true, output 0-based half-open coordinates; if false, 1-based closed coordinates
34+
pub(crate) coordinate_system_zero_based: bool,
3335
}
3436

3537
impl Debug for BamExec {
@@ -84,6 +86,7 @@ impl ExecutionPlan for BamExec {
8486
self.thread_num,
8587
self.projection.clone(),
8688
self.object_storage_options.clone(),
89+
self.coordinate_system_zero_based,
8790
);
8891
let stream = futures::stream::once(fut).try_flatten();
8992
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
@@ -95,6 +98,7 @@ async fn get_remote_bam_stream(
9598
batch_size: usize,
9699
projection: Option<Vec<usize>>,
97100
object_storage_options: Option<ObjectStorageOptions>,
101+
coordinate_system_zero_based: bool,
98102
) -> datafusion::error::Result<
99103
AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output = ()> + Sized>,
100104
> {
@@ -140,7 +144,9 @@ async fn get_remote_bam_stream(
140144
chrom.push(chrom_name);
141145
match record.alignment_start() {
142146
Some(start_pos) => {
143-
start.push(Some(start_pos?.get() as u32));
147+
// noodles normalizes all positions to 1-based; subtract 1 for 0-based output
148+
let pos = start_pos?.get() as u32;
149+
start.push(Some(if coordinate_system_zero_based { pos - 1 } else { pos }));
144150
},
145151
None => {
146152
start.push(None);
@@ -180,7 +186,11 @@ async fn get_remote_bam_stream(
180186
);
181187
mate_chrom.push(chrom_name);
182188
match record.mate_alignment_start() {
183-
Some(start) => mate_start.push(Some(start?.get() as u32)),
189+
Some(mate_start_pos) => {
190+
// noodles normalizes all positions to 1-based; subtract 1 for 0-based output
191+
let pos = mate_start_pos?.get() as u32;
192+
mate_start.push(Some(if coordinate_system_zero_based { pos - 1 } else { pos }));
193+
},
184194
_ => mate_start.push(None),
185195
};
186196

@@ -252,6 +262,7 @@ async fn get_local_bam(
252262
batch_size: usize,
253263
thread_num: Option<usize>,
254264
projection: Option<Vec<usize>>,
265+
coordinate_system_zero_based: bool,
255266
) -> datafusion::error::Result<impl futures::Stream<Item = datafusion::error::Result<RecordBatch>>>
256267
{
257268
let mut name: Vec<Option<String>> = Vec::with_capacity(batch_size);
@@ -298,7 +309,9 @@ async fn get_local_bam(
298309
};
299310
match record.alignment_start() {
300311
Some(start_pos) => {
301-
start.push(Some(start_pos?.get() as u32));
312+
// noodles normalizes all positions to 1-based; subtract 1 for 0-based output
313+
let pos = start_pos?.get() as u32;
314+
start.push(Some(if coordinate_system_zero_based { pos - 1 } else { pos }));
302315
},
303316
None => {
304317
start.push(None);
@@ -335,7 +348,11 @@ async fn get_local_bam(
335348
);
336349
mate_chrom.push(chrom_name);
337350
match record.mate_alignment_start() {
338-
Some(start) => mate_start.push(Some(start?.get() as u32)),
351+
Some(mate_start_pos) => {
352+
// noodles normalizes all positions to 1-based; subtract 1 for 0-based output
353+
let pos = mate_start_pos?.get() as u32;
354+
mate_start.push(Some(if coordinate_system_zero_based { pos - 1 } else { pos }));
355+
},
339356
None => mate_start.push(None),
340357
};
341358

@@ -484,6 +501,7 @@ async fn get_stream(
484501
thread_num: Option<usize>,
485502
projection: Option<Vec<usize>>,
486503
object_storage_options: Option<ObjectStorageOptions>,
504+
coordinate_system_zero_based: bool,
487505
) -> datafusion::error::Result<SendableRecordBatchStream> {
488506
// Open the BGZF-indexed VCF using IndexedReader.
489507

@@ -499,6 +517,7 @@ async fn get_stream(
499517
batch_size,
500518
thread_num,
501519
projection,
520+
coordinate_system_zero_based,
502521
)
503522
.await?;
504523
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))
@@ -510,6 +529,7 @@ async fn get_stream(
510529
batch_size,
511530
projection,
512531
object_storage_options,
532+
coordinate_system_zero_based,
513533
)
514534
.await?;
515535
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))

datafusion/bio-format-bam/src/table_provider.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ use datafusion::physical_plan::{
99
ExecutionPlan, PlanProperties,
1010
execution_plan::{Boundedness, EmissionType},
1111
};
12+
use datafusion_bio_format_core::COORDINATE_SYSTEM_METADATA_KEY;
1213
use datafusion_bio_format_core::object_storage::ObjectStorageOptions;
1314
use log::debug;
1415
use std::any::Any;
16+
use std::collections::HashMap;
1517
use std::sync::Arc;
1618

17-
fn determine_schema() -> datafusion::common::Result<SchemaRef> {
19+
fn determine_schema(coordinate_system_zero_based: bool) -> datafusion::common::Result<SchemaRef> {
1820
let fields = vec![
1921
Field::new("name", DataType::Utf8, true),
2022
Field::new("chrom", DataType::Utf8, true),
@@ -28,7 +30,13 @@ fn determine_schema() -> datafusion::common::Result<SchemaRef> {
2830
Field::new("sequence", DataType::Utf8, false),
2931
Field::new("quality_scores", DataType::Utf8, false),
3032
];
31-
let schema = Schema::new(fields);
33+
// Add coordinate system metadata to schema
34+
let mut metadata = HashMap::new();
35+
metadata.insert(
36+
COORDINATE_SYSTEM_METADATA_KEY.to_string(),
37+
coordinate_system_zero_based.to_string(),
38+
);
39+
let schema = Schema::new_with_metadata(fields, metadata);
3240
debug!("Schema: {:?}", schema);
3341
Ok(Arc::new(schema))
3442
}
@@ -48,6 +56,8 @@ pub struct BamTableProvider {
4856
thread_num: Option<usize>,
4957
/// Configuration for cloud storage access
5058
object_storage_options: Option<ObjectStorageOptions>,
59+
/// If true, output 0-based half-open coordinates; if false, 1-based closed coordinates
60+
coordinate_system_zero_based: bool,
5161
}
5262

5363
impl BamTableProvider {
@@ -58,6 +68,8 @@ impl BamTableProvider {
5868
/// * `file_path` - Path to the BAM file (local or remote URL)
5969
/// * `thread_num` - Optional number of threads for BGZF decompression
6070
/// * `object_storage_options` - Optional configuration for cloud storage access
71+
/// * `coordinate_system_zero_based` - If true (default), output 0-based half-open coordinates;
72+
/// if false, output 1-based closed coordinates
6173
///
6274
/// # Returns
6375
///
@@ -73,6 +85,7 @@ impl BamTableProvider {
7385
/// "data/alignments.bam".to_string(),
7486
/// Some(4), // Use 4 threads
7587
/// None, // No cloud storage
88+
/// true, // Use 0-based coordinates (default)
7689
/// )?;
7790
/// # Ok(())
7891
/// # }
@@ -81,13 +94,15 @@ impl BamTableProvider {
8194
file_path: String,
8295
thread_num: Option<usize>,
8396
object_storage_options: Option<ObjectStorageOptions>,
97+
coordinate_system_zero_based: bool,
8498
) -> datafusion::common::Result<Self> {
85-
let schema = determine_schema()?;
99+
let schema = determine_schema(coordinate_system_zero_based)?;
86100
Ok(Self {
87101
file_path,
88102
schema,
89103
thread_num,
90104
object_storage_options,
105+
coordinate_system_zero_based,
91106
})
92107
}
93108
}
@@ -146,6 +161,7 @@ impl TableProvider for BamTableProvider {
146161
limit,
147162
thread_num: self.thread_num,
148163
object_storage_options: self.object_storage_options.clone(),
164+
coordinate_system_zero_based: self.coordinate_system_zero_based,
149165
}))
150166
}
151167
}

datafusion/bio-format-bed/examples/test_bed_reader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3232
BEDFields::BED4,
3333
Some(1),
3434
Some(object_storage_options),
35+
true, // Use 0-based coordinates (default)
3536
)
3637
.unwrap();
3738
let ctx = datafusion::execution::context::SessionContext::new();

datafusion/bio-format-bed/src/physical_exec.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub struct BedExec {
4141
pub(crate) thread_num: Option<usize>,
4242
/// Optional cloud storage configuration
4343
pub(crate) object_storage_options: Option<ObjectStorageOptions>,
44+
/// If true, output 0-based half-open coordinates; if false, 1-based closed coordinates
45+
pub(crate) coordinate_system_zero_based: bool,
4446
}
4547

4648
impl Debug for BedExec {
@@ -95,7 +97,7 @@ impl ExecutionPlan for BedExec {
9597
_partition: usize,
9698
context: Arc<TaskContext>,
9799
) -> datafusion::common::Result<SendableRecordBatchStream> {
98-
debug!("GffExec::execute");
100+
debug!("BedExec::execute");
99101
debug!("Projection: {:?}", self.projection);
100102
let batch_size = context.session_config().batch_size();
101103
let schema = self.schema.clone();
@@ -107,6 +109,7 @@ impl ExecutionPlan for BedExec {
107109
self.thread_num,
108110
self.projection.clone(),
109111
self.object_storage_options.clone(),
112+
self.coordinate_system_zero_based,
110113
);
111114
let stream = futures::stream::once(fut).try_flatten();
112115
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
@@ -123,13 +126,15 @@ impl ExecutionPlan for BedExec {
123126
/// * `batch_size` - Number of records per batch
124127
/// * `projection` - Optional column projection
125128
/// * `object_storage_options` - Cloud storage configuration
129+
/// * `coordinate_system_zero_based` - If true, output 0-based coordinates; if false, 1-based
126130
async fn get_remote_bed_stream(
127131
file_path: String,
128132
bed_fields: BEDFields,
129133
schema: SchemaRef,
130134
batch_size: usize,
131135
projection: Option<Vec<usize>>,
132136
object_storage_options: Option<ObjectStorageOptions>,
137+
coordinate_system_zero_based: bool,
133138
) -> datafusion::error::Result<
134139
AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output = ()> + Sized>,
135140
> {
@@ -158,8 +163,16 @@ async fn get_remote_bed_stream(
158163
while let Some(result) = records.next().await {
159164
let record = result?; // propagate errors if any
160165
chroms.push(record.reference_sequence_name().to_string());
161-
poss.push(record.feature_start()?.get() as u32);
162-
pose.push(record.feature_end().unwrap()?.get() as u32);
166+
// BED files are natively 0-based in noodles. feature_start().get() returns 1-based.
167+
// For 0-based output: subtract 1 to get back to 0-based
168+
// For 1-based output: use as-is (noodles already returns 1-based)
169+
let start_pos = record.feature_start()?.get() as u32;
170+
poss.push(if coordinate_system_zero_based { start_pos - 1 } else { start_pos });
171+
// End position: noodles returns 1-based exclusive end
172+
// For 0-based half-open: subtract 1 to get 0-based exclusive end
173+
// For 1-based closed: use as-is
174+
let end_pos = record.feature_end().unwrap()?.get() as u32;
175+
pose.push(if coordinate_system_zero_based { end_pos - 1 } else { end_pos });
163176
name.push(record.name().map(|n| n.to_string()));
164177

165178
record_num += 1;
@@ -195,7 +208,6 @@ async fn get_remote_bed_stream(
195208
&pose,
196209
&name,
197210
projection.clone(),
198-
// if infos.is_empty() { None } else { Some(&infos) },
199211
)?;
200212
yield batch;
201213
}
@@ -213,13 +225,15 @@ async fn get_remote_bed_stream(
213225
/// * `batch_size` - Number of records per batch
214226
/// * `thread_num` - Number of threads for parallel BGZF decompression
215227
/// * `projection` - Optional column projection
228+
/// * `coordinate_system_zero_based` - If true, output 0-based coordinates; if false, 1-based
216229
async fn get_local_bed(
217230
file_path: String,
218231
bed_fields: BEDFields,
219232
schema: SchemaRef,
220233
batch_size: usize,
221234
thread_num: Option<usize>,
222235
projection: Option<Vec<usize>>,
236+
coordinate_system_zero_based: bool,
223237
) -> datafusion::error::Result<impl futures::Stream<Item = datafusion::error::Result<RecordBatch>>>
224238
{
225239
let mut reader = match bed_fields {
@@ -244,8 +258,16 @@ async fn get_local_bed(
244258
while let Some(result) = records.next().await {
245259
let record = result?; // propagate errors if any
246260
chroms.push(record.reference_sequence_name().to_string());
247-
poss.push(record.feature_start()?.get() as u32);
248-
pose.push(record.feature_end().unwrap()?.get() as u32);
261+
// BED files are natively 0-based in noodles. feature_start().get() returns 1-based.
262+
// For 0-based output: subtract 1 to get back to 0-based
263+
// For 1-based output: use as-is (noodles already returns 1-based)
264+
let start_pos = record.feature_start()?.get() as u32;
265+
poss.push(if coordinate_system_zero_based { start_pos - 1 } else { start_pos });
266+
// End position: noodles returns 1-based exclusive end
267+
// For 0-based half-open: subtract 1 to get 0-based exclusive end
268+
// For 1-based closed: use as-is
269+
let end_pos = record.feature_end().unwrap()?.get() as u32;
270+
pose.push(if coordinate_system_zero_based { end_pos - 1 } else { end_pos });
249271
name.push(record.name().map(|n| n.to_string()));
250272

251273
record_num += 1;
@@ -279,8 +301,8 @@ async fn get_local_bed(
279301
&chroms,
280302
&poss,
281303
&pose,
282-
&name
283-
, projection.clone(),
304+
&name,
305+
projection.clone(),
284306
)?;
285307
yield batch;
286308
}
@@ -351,6 +373,7 @@ fn build_record_batch(
351373
/// * `thread_num` - Optional thread count for parallel reading
352374
/// * `projection` - Optional column projection
353375
/// * `object_storage_options` - Cloud storage configuration
376+
/// * `coordinate_system_zero_based` - If true, output 0-based coordinates; if false, 1-based
354377
async fn get_stream(
355378
file_path: String,
356379
bed_fields: BEDFields,
@@ -359,8 +382,9 @@ async fn get_stream(
359382
thread_num: Option<usize>,
360383
projection: Option<Vec<usize>>,
361384
object_storage_options: Option<ObjectStorageOptions>,
385+
coordinate_system_zero_based: bool,
362386
) -> datafusion::error::Result<SendableRecordBatchStream> {
363-
// Open the BGZF-indexed VCF using IndexedReader.
387+
// Open the BGZF-indexed BED file.
364388

365389
let file_path = file_path.clone();
366390
let store_type = get_storage_type(file_path.clone());
@@ -375,6 +399,7 @@ async fn get_stream(
375399
batch_size,
376400
thread_num,
377401
projection,
402+
coordinate_system_zero_based,
378403
)
379404
.await?;
380405
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))
@@ -387,6 +412,7 @@ async fn get_stream(
387412
batch_size,
388413
projection,
389414
object_storage_options,
415+
coordinate_system_zero_based,
390416
)
391417
.await?;
392418
Ok(Box::pin(RecordBatchStreamAdapter::new(schema_ref, stream)))

0 commit comments

Comments
 (0)