diff --git a/wayang-platforms/wayang-java/pom.xml b/wayang-platforms/wayang-java/pom.xml
index ce15decfe..d45e58fe7 100644
--- a/wayang-platforms/wayang-java/pom.xml
+++ b/wayang-platforms/wayang-java/pom.xml
@@ -50,16 +50,13 @@
1.1.2-SNAPSHOT
- org.apache.hadoop
- hadoop-common
-
-
- org.apache.hadoop
- hadoop-hdfs
+ org.apache.parquet
+ parquet-avro
+ 1.15.2
org.apache.parquet
- parquet-avro
+ parquet-hadoop
1.15.2
@@ -72,11 +69,6 @@
slf4j-api
2.0.6
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- 2.20.0
-
org.mockito
@@ -86,4 +78,4 @@
-
+
\ No newline at end of file
diff --git a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaParquetSourceTest.java b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaParquetSourceTest.java
new file mode 100644
index 000000000..79491015d
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaParquetSourceTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test suite for {@link JavaParquetSource}.
+ */
+class JavaParquetSourceTest extends JavaExecutionOperatorTestBase {
+
+ @Test
+ void testReadAllColumns() {
+ Path parquetFile = Paths.get("src/test/resources/data.parquet").toAbsolutePath();
+ JavaParquetSource source = new JavaParquetSource(parquetFile.toUri().toString(), null);
+
+ ChannelInstance[] inputs = new ChannelInstance[0];
+ ChannelInstance[] outputs = new ChannelInstance[]{createStreamChannelInstance()};
+ evaluate(source, inputs, outputs);
+
+ List result = ((StreamChannel.Instance) outputs[0]).provideStream().toList();
+
+ assertEquals(3, result.size());
+ assertEquals("alice", result.get(0).getString(0));
+ assertEquals(30, result.get(0).getInt(1));
+ assertEquals("bob", result.get(1).getString(0));
+ assertEquals(25, result.get(1).getInt(1));
+ assertEquals("carol", result.get(2).getString(0));
+ assertEquals(41, result.get(2).getInt(1));
+ }
+
+ @Test
+ void testReadWithProjection() {
+ Path parquetFile = Paths.get("src/test/resources/data.parquet").toAbsolutePath();
+ JavaParquetSource source = new JavaParquetSource(parquetFile.toUri().toString(), new String[]{"name"});
+
+ ChannelInstance[] inputs = new ChannelInstance[0];
+ ChannelInstance[] outputs = new ChannelInstance[]{createStreamChannelInstance()};
+ evaluate(source, inputs, outputs);
+
+ List result = ((StreamChannel.Instance) outputs[0]).provideStream().toList();
+
+ assertEquals(3, result.size());
+ // Each projected record contains only the "name" column
+ assertEquals(1, result.get(0).size());
+ assertEquals("alice", result.get(0).getString(0));
+ assertEquals("bob", result.get(1).getString(0));
+ assertEquals("carol", result.get(2).getString(0));
+ }
+
+ /* The following lines were used to create the sample Parquet file.
+ We keep it here for reference, but we don't want to run it in the test suite as it adds a dependency on Hadoop and Parquet libraries and complicates the test setup.
+ */
+
+ /*
+ private static final Schema SCHEMA = SchemaBuilder.record("TestRecord")
+ .namespace("org.apache.wayang.test")
+ .fields()
+ .requiredString("name")
+ .requiredInt("age")
+ .endRecord();
+
+ private static Path writeSampleParquet(Path dir) throws IOException {
+ Path file = dir.resolve("data.parquet");
+ org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
+ List records = Arrays.asList(
+ makeRecord("alice", 30),
+ makeRecord("bob", 25),
+ makeRecord("carol", 41)
+ );
+ try (ParquetWriter writer = AvroParquetWriter.builder(hadoopPath)
+ .withSchema(SCHEMA)
+ .withConf(new Configuration())
+ .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+ .build()) {
+ for (GenericRecord record : records) {
+ writer.write(record);
+ }
+ }
+ return file;
+ }
+
+ private static GenericRecord makeRecord(String name, int age) {
+ GenericRecord record = new GenericData.Record(SCHEMA);
+ record.put("name", name);
+ record.put("age", age);
+ return record;
+ }
+ */
+}
diff --git a/wayang-platforms/wayang-java/src/test/resources/data.parquet b/wayang-platforms/wayang-java/src/test/resources/data.parquet
new file mode 100644
index 000000000..403d72be1
Binary files /dev/null and b/wayang-platforms/wayang-java/src/test/resources/data.parquet differ