Parquet source: Expand column stats support, fix bugs in schema conversion#805
Parquet source: Expand column stats support, fix bugs in schema conversion#805the-other-tim-brown wants to merge 4 commits intoapache:mainfrom
Conversation
| @@ -65,12 +65,6 @@ public static ParquetStatsExtractor getInstance() { | |||
| private static PathBasedPartitionSpecExtractor partitionSpecExtractor = | |||
There was a problem hiding this comment.
partitionValueExtractor (line 63), partitionSpecExtractor (line 65), and getMaxFromColumnStats (line 68) appear to be dead code after the removal of toInternalDataFile. These should be cleaned up along with the unused import org.apache.hadoop.fs.*; on line 32.
| ColumnChunkMetaData first = chunks.get(0); | ||
| InternalField internalField = | ||
| SchemaFieldFinder.getInstance() | ||
| .findFieldByPath(internalSchema, first.getPath().toDotString()); |
There was a problem hiding this comment.
SchemaFieldFinder.findFieldByPath() returns null if the column path doesn't exist in the schema (per its Javadoc). This null would silently propagate into the ColumnStat. Consider adding Objects.requireNonNull(internalField, "No field found for path: " + first.getPath().toDotString()).
| long totalSize = chunks.stream().mapToLong(ColumnChunkMetaData::getTotalSize).sum(); | ||
| Object globalMin = | ||
| chunks.stream() | ||
| .map(c -> convertStatsToInternalType(primitiveType, c.getStatistics().genericGetMin())) |
There was a problem hiding this comment.
c.getStatistics().genericGetMin() returns null for all-null columns (hasNonNullValue() == false). This would NPE inside convertStatsToInternalType. Consider filtering out chunks where stats have no non-null values before the min/max aggregation.
| getInternalDataFiles(getParquetFiles(hadoopConf, basePath)); | ||
| InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, basePath)); | ||
| Stream<InternalDataFile> internalDataFiles = | ||
| getInternalDataFiles(getParquetFiles(hadoopConf, basePath), table.getReadSchema()); |
There was a problem hiding this comment.
getParquetFiles() is called 3 times in getCurrentSnapshot() (lines 186, 188, 193), triggering 3 separate filesystem scans. This is inefficient and could yield inconsistent results if files change between calls. The comment on line 185 says "call the method twice" but it's actually 3 times now. Consider collecting to a list once and creating new streams from it?
| import org.apache.parquet.hadoop.metadata.CompressionCodecName; | ||
| import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||
| import org.apache.parquet.io.api.Binary; | ||
| import org.apache.parquet.schema.*; |
There was a problem hiding this comment.
Nit: wildcard import org.apache.parquet.schema.*; — and the explicit import org.apache.parquet.schema.MessageType on line 49 is redundant since the wildcard already covers it. Expand the wildcard into explicit imports.
What is the purpose of the pull request
This adds support for more column types for the parquet source. As part of adding the testing, other issues with the schema conversion were discovered.
Closes #748
Brief change log
Verify this pull request
New unit tests are added for the added functionality, existing tests are also updated to ensure there is good coverage for the schema conversion logic.