Skip to content

Commit 67a5074

Browse files
natirpierre-marijon
authored andcommitted
feat: implement bucket transform
For type: - Int32 - Int64 - Date32 - Time32 - Utf8/String
1 parent e193f0a commit 67a5074

File tree

3 files changed

+179
-2
lines changed

3 files changed

+179
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ itertools = "0.14.0"
3838
lazy_static = "1.5.0"
3939
lru = "0.16.0"
4040
object_store = { version = "0.12", features = ["aws", "gcp"] }
41+
fasthash = { version = "0.4" }
4142
parquet = { version = "56", features = ["async", "object_store"] }
4243
pin-project-lite = "0.2"
4344
regex = "1.11.1"

iceberg-rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ itertools = { workspace = true }
2424
lazy_static = { workspace = true }
2525
lru = { workspace = true }
2626
object_store = { workspace = true }
27+
fasthash = { workspace = true }
2728
parquet = { workspace = true }
2829
pin-project-lite = { workspace = true }
2930
regex = { workspace = true }
@@ -44,4 +45,3 @@ uuid = { workspace = true }
4445
rstest = "0.23.0"
4546
chrono = { workspace = true }
4647
iceberg-sql-catalog = { path = "../catalogs/iceberg-sql-catalog" }
47-

iceberg-rust/src/arrow/transform.rs

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
use std::sync::Arc;
1515

1616
use arrow::{
17-
array::{as_primitive_array, Array, ArrayRef, PrimitiveArray},
17+
array::{as_primitive_array, downcast_array, Array, ArrayRef, PrimitiveArray, StringArray},
18+
buffer::ScalarBuffer,
1819
compute::{binary, cast, date_part, unary, DatePart},
1920
datatypes::{
2021
DataType, Date32Type, Int16Type, Int32Type, Int64Type, TimeUnit, TimestampMicrosecondType,
@@ -43,6 +44,14 @@ static MICROS_IN_DAY: i64 = 86_400_000_000;
4344
/// * Month - Extracts month from date32 or timestamp
4445
/// * Year - Extracts year from date32 or timestamp
4546
/// * Hour - Extracts hour from timestamp
47+
/// * Int16 - Truncate value
48+
/// * Int32 - Truncate value
49+
/// * Int64 - Truncate value
50+
/// * Int32 - Use hash of value to repart it between bucket
51+
/// * Int64 - Use hash of value to repart it between bucket
52+
/// * Date32 - Use hash of value to repart it between bucket
53+
/// * Time32 - Use hash of value to repart it between bucket
54+
/// * Utf8 - Use hash of value to repart it between bucket
4655
pub fn transform_arrow(array: ArrayRef, transform: &Transform) -> Result<ArrayRef, ArrowError> {
4756
match (array.data_type(), transform) {
4857
(_, Transform::Identity) => Ok(array),
@@ -114,6 +123,56 @@ pub fn transform_arrow(array: ArrayRef, transform: &Transform) -> Result<ArrayRe
114123
i - i.rem_euclid(*m as i64)
115124
}),
116125
)),
126+
(DataType::Int32, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
127+
unary(as_primitive_array::<Int32Type>(&array), |i| {
128+
(fasthash::murmur3::hash32_with_seed((i as i64).to_le_bytes(), 0) as i32)
129+
.rem_euclid(*m as i32)
130+
}),
131+
)),
132+
(DataType::Int64, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
133+
unary(as_primitive_array::<Int64Type>(&array), |i| {
134+
(fasthash::murmur3::hash32_with_seed(i.to_le_bytes(), 0) as i32)
135+
.rem_euclid(*m as i32)
136+
}),
137+
)),
138+
(DataType::Date32, Transform::Bucket(m)) => {
139+
let temp = cast(&array, &DataType::Int32)?;
140+
141+
Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
142+
as_primitive_array::<Int32Type>(&temp),
143+
|i| {
144+
(fasthash::murmur3::hash32_with_seed(i.to_le_bytes(), 0) as i32)
145+
.rem_euclid(*m as i32)
146+
},
147+
)))
148+
}
149+
(DataType::Time32(TimeUnit::Millisecond), Transform::Bucket(m)) => {
150+
let temp = cast(&array, &DataType::Int32)?;
151+
152+
Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
153+
as_primitive_array::<Int32Type>(&temp),
154+
|i: i32| {
155+
(fasthash::murmur3::hash32_with_seed((i as i64).to_le_bytes(), 0) as i32)
156+
.rem_euclid(*m as i32)
157+
},
158+
)))
159+
}
160+
(DataType::Utf8, Transform::Bucket(m)) => {
161+
let nulls = array.nulls();
162+
let local_array: StringArray = downcast_array::<StringArray>(&array);
163+
164+
Ok(Arc::new(PrimitiveArray::<Int32Type>::new(
165+
ScalarBuffer::from_iter(local_array.iter().map(|a| {
166+
if let Some(value) = a {
167+
fasthash::murmur3::hash32_with_seed(value.as_bytes(), 0) as i32
168+
} else {
169+
0
170+
}
171+
.rem_euclid(*m as i32)
172+
})),
173+
nulls.cloned(),
174+
)))
175+
}
117176
_ => Err(ArrowError::ComputeError(
118177
"Failed to perform transform for datatype".to_string(),
119178
)),
@@ -322,6 +381,123 @@ mod tests {
322381
assert_eq!(&expected, &result);
323382
}
324383

384+
#[test]
385+
fn test_bucket_hash_value() {
386+
// Check value match https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
387+
388+
// 34 -> 2017239379
389+
assert_eq!(
390+
fasthash::murmur3::hash32_with_seed((34i32 as i64).to_le_bytes(), 0),
391+
2017239379
392+
);
393+
// 34 -> 2017239379
394+
assert_eq!(
395+
fasthash::murmur3::hash32_with_seed((34i64).to_le_bytes(), 0),
396+
2017239379
397+
);
398+
// daysFromUnixEpoch(2017-11-16) -> 17_486 -> -653330422
399+
assert_eq!(
400+
fasthash::murmur3::hash32_with_seed((17_486i32 as i64).to_le_bytes(), 0) as i32,
401+
-653330422
402+
);
403+
// 81_068_000_000 number of micros from midnight 22:31:08
404+
assert_eq!(
405+
fasthash::murmur3::hash32_with_seed((81_068_000_000i64).to_le_bytes(), 0) as i32,
406+
-662762989
407+
);
408+
409+
// utf8Bytes(iceberg) -> 1210000089
410+
assert_eq!(
411+
fasthash::murmur3::hash32_with_seed("iceberg".as_bytes(), 0) as i32,
412+
1210000089
413+
);
414+
}
415+
416+
#[test]
417+
fn test_int32_bucket_transform() {
418+
assert_eq!(
419+
fasthash::murmur3::hash32_with_seed(17_486i64.to_le_bytes(), 0) as i32,
420+
-653_330_422
421+
);
422+
423+
let array = Arc::new(arrow::array::Int32Array::from(vec![
424+
Some(34), // Spec value
425+
Some(17_486), // number of day between 2017-11-16 and epoch
426+
Some(84668000), // number of micros from midnight 22:31:08
427+
Some(-2000),
428+
Some(0),
429+
None,
430+
])) as ArrayRef;
431+
let result = transform_arrow(array, &Transform::Bucket(2_017_239_380)).unwrap();
432+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
433+
Some(2017239379),
434+
Some(1363908958),
435+
Some(988822981),
436+
Some(964620854),
437+
Some(1669671676),
438+
None,
439+
])) as ArrayRef;
440+
assert_eq!(&expected, &result);
441+
}
442+
443+
#[test]
444+
fn test_int64_bucket_transform() {
445+
let array = Arc::new(arrow::array::Int64Array::from(vec![
446+
Some(34), // Spec value
447+
Some(17_486), // number of day between 2017-11-16 and epoch
448+
Some(2000),
449+
Some(-2000),
450+
Some(0),
451+
None,
452+
])) as ArrayRef;
453+
let result = transform_arrow(array, &Transform::Bucket(2_017_239_380)).unwrap();
454+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
455+
Some(2017239379),
456+
Some(1363908958),
457+
Some(716_914_497),
458+
Some(964_620_854),
459+
Some(1669671676),
460+
None,
461+
])) as ArrayRef;
462+
assert_eq!(&expected, &result);
463+
}
464+
465+
#[test]
466+
fn test_date32_bucket_transform() {
467+
let array = Arc::new(arrow::array::Date32Array::from(vec![
468+
Some(17_486), // number of day between 2017-11-16
469+
None,
470+
])) as ArrayRef;
471+
let result = transform_arrow(array, &Transform::Bucket(2_017_239_380)).unwrap();
472+
let expected =
473+
Arc::new(arrow::array::Int32Array::from(vec![Some(1087118125), None])) as ArrayRef;
474+
assert_eq!(&expected, &result);
475+
}
476+
477+
#[test]
478+
fn test_time32_bucket_transform() {
479+
let array = Arc::new(arrow::array::Time32MillisecondArray::from(vec![
480+
Some(81_068_000), // number of number of micros from midnight 22:31:08
481+
None,
482+
])) as ArrayRef;
483+
let result = transform_arrow(array, &Transform::Bucket(29_824_073)).unwrap();
484+
let expected =
485+
Arc::new(arrow::array::Int32Array::from(vec![Some(10797584), None])) as ArrayRef;
486+
assert_eq!(&expected, &result);
487+
}
488+
489+
#[test]
490+
fn test_utf8_bucket_transform() {
491+
let array =
492+
Arc::new(arrow::array::StringArray::from(vec![Some("iceberg"), None])) as ArrayRef;
493+
let result = transform_arrow(array, &Transform::Bucket(1_210_000_090)).unwrap();
494+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
495+
Some(1_210_000_089),
496+
None,
497+
])) as ArrayRef;
498+
assert_eq!(&expected, &result);
499+
}
500+
325501
#[test]
326502
fn test_unsupported_transform() {
327503
let array = Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef;

0 commit comments

Comments
 (0)