Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 123 additions & 0 deletions encodings/parquet-variant/src/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::Array as _;
use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::cast::AsArray;
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY;
use parquet_variant_compute::VariantArray as ArrowVariantArray;
use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::VTable;
use vortex_array::arrow::ArrowExport;
use vortex_array::arrow::ArrowExportVTable;
use vortex_array::arrow::ArrowImport;
use vortex_array::arrow::ArrowImportVTable;
use vortex_array::arrow::ArrowSession;
use vortex_array::dtype::DType;
use vortex_array::dtype::extension::ExtDTypeRef;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::registry::CachedId;
use vortex_session::registry::Id;

use crate::ParquetVariant;
use crate::ParquetVariantArrayExt;

/// Arrow canonical extension name for Parquet Variant storage.
pub const PARQUET_VARIANT_ARROW_EXTENSION_NAME: &str = "arrow.parquet.variant";

static ARROW_PARQUET_VARIANT: CachedId = CachedId::new(PARQUET_VARIANT_ARROW_EXTENSION_NAME);

impl ArrowExportVTable for ParquetVariant {
fn arrow_ext_id(&self) -> Id {
*ARROW_PARQUET_VARIANT
}

fn vortex_ext_id(&self) -> Id {
ParquetVariant.id()
}

fn to_arrow_field(
&self,
_name: &str,
_dtype: &ExtDTypeRef,
_session: &ArrowSession,
) -> VortexResult<Option<Field>> {
Ok(None)
}

fn execute_arrow(
&self,
array: ArrayRef,
target: &Field,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrowExport> {
if target
.metadata()
.get(EXTENSION_TYPE_NAME_KEY)
.map(String::as_str)
!= Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME)
|| !array.dtype().is_variant()
{
return Ok(ArrowExport::Unsupported(array));
}

let executed = array.execute_until::<ParquetVariant>(ctx)?;
let parquet_array = executed
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant storage"))?;
let arrow_variant = parquet_array.to_arrow(ctx)?;
Ok(ArrowExport::Exported(Arc::new(arrow_variant.into_inner())))
}
}

impl ArrowImportVTable for ParquetVariant {
fn arrow_ext_id(&self) -> Id {
*ARROW_PARQUET_VARIANT
}

fn from_arrow_field(&self, field: &Field) -> VortexResult<Option<DType>> {
if field
.metadata()
.get(EXTENSION_TYPE_NAME_KEY)
.map(String::as_str)
!= Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME)
{
return Ok(None);
}

Ok(Some(DType::Variant(field.is_nullable().into())))
}

fn from_arrow_array(
&self,
array: ArrowArrayRef,
field: &Field,
dtype: &DType,
) -> VortexResult<ArrowImport> {
if !matches!(dtype, DType::Variant(_))
|| field
.metadata()
.get(EXTENSION_TYPE_NAME_KEY)
.map(String::as_str)
!= Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME)
|| !matches!(array.data_type(), DataType::Struct(_))
{
return Ok(ArrowImport::Unsupported(array));
}

let arrow_variant = ArrowVariantArray::try_new(array.as_struct())?;
let imported = if dtype.is_nullable() {
ParquetVariant::from_arrow_variant_nullable(&arrow_variant)?
} else {
ParquetVariant::from_arrow_variant(&arrow_variant)?
};
Ok(ArrowImport::Imported(imported.into_array()))
}
}
99 changes: 99 additions & 0 deletions encodings/parquet-variant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,110 @@
//! [Arrow canonical extension type]: https://arrow.apache.org/docs/format/CanonicalExtensions.html#parquet-variant

mod array;
mod arrow;
mod kernel;
mod operations;
mod validity;
mod vtable;

use std::sync::Arc;

pub use array::ParquetVariantArrayExt;
pub use arrow::PARQUET_VARIANT_ARROW_EXTENSION_NAME;
use vortex_array::arrow::ArrowSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;
pub use vtable::ParquetVariant;
pub use vtable::ParquetVariantArray;

/// Register Parquet Variant array and Arrow extension support with a session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(ParquetVariant);
session.arrow().register_exporter(Arc::new(ParquetVariant));
session.arrow().register_importer(Arc::new(ParquetVariant));
}

#[cfg(test)]
mod arrow_session_tests {
use std::sync::Arc;

use arrow_array::Array as _;
use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::StructArray;
use arrow_array::cast::AsArray;
use arrow_schema::Field;
use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY;
use parquet_variant::Variant as PqVariant;
use parquet_variant_compute::VariantArrayBuilder;
use vortex_array::VortexSessionExecute;
use vortex_array::arrow::ArrowSessionExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::session::ArraySession;
use vortex_error::VortexResult;
use vortex_session::VortexSession;

use crate::ParquetVariant;

fn session() -> VortexSession {
let session = VortexSession::empty().with::<ArraySession>();
crate::initialize(&session);
session
}

fn arrow_variant_storage() -> StructArray {
let mut builder = VariantArrayBuilder::new(3);
builder.append_variant(PqVariant::from(42i8));
builder.append_variant(PqVariant::from(true));
builder.append_variant(PqVariant::from("vortex"));
builder.build().into_inner()
}

fn arrow_variant_field(storage: &StructArray) -> Field {
Field::new("variant", storage.data_type().clone(), false).with_metadata(
[(
EXTENSION_TYPE_NAME_KEY.to_string(),
"arrow.parquet.variant".to_string(),
)]
.into(),
)
}

#[test]
fn arrow_session_imports_parquet_variant_extension_array() -> VortexResult<()> {
let session = session();
let storage = arrow_variant_storage();
let field = arrow_variant_field(&storage);
let imported = session
.arrow()
.from_arrow_array(Arc::new(storage) as ArrowArrayRef, &field)?;

assert_eq!(imported.dtype(), &DType::Variant(Nullability::NonNullable));
assert!(imported.as_opt::<ParquetVariant>().is_some());
Ok(())
}

#[test]
fn arrow_session_exports_parquet_variant_extension_array() -> VortexResult<()> {
let session = session();
let storage = arrow_variant_storage();
let field = arrow_variant_field(&storage);
let imported = session
.arrow()
.from_arrow_array(Arc::new(storage.clone()) as ArrowArrayRef, &field)?;

let mut ctx = session.create_execution_ctx();
let exported = session
.arrow()
.execute_arrow(imported, Some(&field), &mut ctx)?;
let exported = exported.as_struct();

assert_eq!(exported.len(), storage.len());
assert_eq!(exported.column_names(), storage.column_names());
assert_eq!(exported.fields(), storage.fields());
for (actual, expected) in exported.columns().iter().zip(storage.columns()) {
assert_eq!(actual.to_data(), expected.to_data());
}
Ok(())
}
}
4 changes: 3 additions & 1 deletion java/vortex-jni/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ mavenPublishing {
coordinates(groupId = "dev.vortex", artifactId = "vortex-jni", version = "${rootProject.version}")
publishToMavenCentral()

signAllPublications()
if (!project.hasProperty("skip.signing")) {
signAllPublications()
}

pom {
name = "vortex-jni"
Expand Down
97 changes: 97 additions & 0 deletions java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dev.vortex.jni;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -26,17 +27,25 @@
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public final class JNIWriterTest {
private static final String ARROW_EXTENSION_NAME = "ARROW:extension:name";
private static final String PARQUET_VARIANT_EXTENSION_NAME = "arrow.parquet.variant";
private static final byte[] VARIANT_METADATA = new byte[] {0x01, 0x00};
private static final byte[] VARIANT_INT8_42 = new byte[] {0x0c, 0x2a};
private static final byte[] VARIANT_TRUE = new byte[] {0x04};

@TempDir
Path tempDir;
Expand All @@ -52,6 +61,45 @@ private static Schema personSchema() {
Field.notNullable("age", new ArrowType.Int(32, true))));
}

private static Schema parquetVariantSchema() {
Field variant = new Field(
"variant",
new FieldType(
true,
ArrowType.Struct.INSTANCE,
null,
Map.of(ARROW_EXTENSION_NAME, PARQUET_VARIANT_EXTENSION_NAME)),
List.of(
Field.notNullable("metadata", new ArrowType.Binary()),
Field.nullable("value", new ArrowType.Binary())));
return new Schema(List.of(variant));
}

private static void populateParquetVariantRoot(VectorSchemaRoot root) {
StructVector variant = (StructVector) root.getVector("variant");
VarBinaryVector metadata = variant.getChild("metadata", VarBinaryVector.class);
VarBinaryVector value = variant.getChild("value", VarBinaryVector.class);

variant.allocateNew();
metadata.allocateNew(3);
value.allocateNew(3);

metadata.setSafe(0, VARIANT_METADATA);
metadata.setSafe(1, VARIANT_METADATA);
metadata.setSafe(2, VARIANT_METADATA);
value.setSafe(0, VARIANT_INT8_42);
value.setSafe(1, VARIANT_TRUE);
value.setNull(2);
variant.setIndexDefined(0);
variant.setIndexDefined(1);
variant.setNull(2);

metadata.setValueCount(3);
value.setValueCount(3);
variant.setValueCount(3);
root.setRowCount(3);
}

@Test
public void testCreateWriter() throws IOException {
Path outputPath = tempDir.resolve("test_create.vortex");
Expand Down Expand Up @@ -155,4 +203,53 @@ public void testWriteBatch() throws IOException {
}
}
}

@Test
public void testParquetVariantRoundTrip() throws IOException {
Path outputPath = tempDir.resolve("test_parquet_variant.vortex");
String writePath = outputPath.toAbsolutePath().toUri().toString();

BufferAllocator allocator = ArrowAllocation.rootAllocator();
Schema schema = parquetVariantSchema();

Session session = Session.create();
try (VortexWriter writer = VortexWriter.create(session, writePath, schema, new HashMap<>(), allocator);
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
populateParquetVariantRoot(root);

try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchemaFfi = ArrowSchema.allocateNew(allocator)) {
Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchemaFfi);
writer.writeBatch(arrowArray.memoryAddress(), arrowSchemaFfi.memoryAddress());
}
}

assertTrue(Files.exists(outputPath), "output file should exist");

DataSource ds = DataSource.open(session, writePath);
Field dataSourceField = ds.arrowSchema(allocator).findField("variant");
assertEquals(
PARQUET_VARIANT_EXTENSION_NAME, dataSourceField.getMetadata().get(ARROW_EXTENSION_NAME));

Scan scan = ds.scan(ScanOptions.of());
Field scanField = scan.arrowSchema(allocator).findField("variant");
assertEquals(PARQUET_VARIANT_EXTENSION_NAME, scanField.getMetadata().get(ARROW_EXTENSION_NAME));

while (scan.hasNext()) {
Partition p = scan.next();
try (ArrowReader reader = p.scanArrow(allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot resultRoot = reader.getVectorSchemaRoot();
StructVector variant = (StructVector) resultRoot.getVector("variant");
VarBinaryVector metadata = variant.getChild("metadata", VarBinaryVector.class);
VarBinaryVector value = variant.getChild("value", VarBinaryVector.class);

assertArrayEquals(VARIANT_METADATA, metadata.get(0));
assertArrayEquals(VARIANT_INT8_42, value.get(0));
assertArrayEquals(VARIANT_METADATA, metadata.get(1));
assertArrayEquals(VARIANT_TRUE, value.get(1));
assertTrue(variant.isNull(2));
}
}
}
}
Loading
Loading