Skip to content

Comments

Parquet source: Expand column stats support, fix bugs in schema conversion#805

Open
the-other-tim-brown wants to merge 4 commits intoapache:mainfrom
the-other-tim-brown:parquet-col-stats
Open

Parquet source: Expand column stats support, fix bugs in schema conversion#805
the-other-tim-brown wants to merge 4 commits intoapache:mainfrom
the-other-tim-brown:parquet-col-stats

Conversation

@the-other-tim-brown
Copy link
Contributor

What is the purpose of the pull request

This adds support for more column types for the parquet source. As part of adding the testing, other issues with the schema conversion were discovered.

Closes #748

Brief change log

  • Ensures types like decimals are handled properly for col stats conversion
  • Fixes bugs in the schema conversion logic
  • Reuses a single schema when extracting the partition and column stats information to reduce number of objects created. This also fixes an issue for nested fields not having the proper schema

Verify this pull request

New unit tests are added for the added functionality, existing tests are also updated to ensure there is good coverage for the schema conversion logic.

@@ -65,12 +65,6 @@ public static ParquetStatsExtractor getInstance() {
private static PathBasedPartitionSpecExtractor partitionSpecExtractor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionValueExtractor (line 63), partitionSpecExtractor (line 65), and getMaxFromColumnStats (line 68) appear to be dead code after the removal of toInternalDataFile. These should be cleaned up along with the unused import org.apache.hadoop.fs.*; on line 32.

ColumnChunkMetaData first = chunks.get(0);
InternalField internalField =
SchemaFieldFinder.getInstance()
.findFieldByPath(internalSchema, first.getPath().toDotString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaFieldFinder.findFieldByPath() returns null if the column path doesn't exist in the schema (per its Javadoc). This null would silently propagate into the ColumnStat. Consider adding Objects.requireNonNull(internalField, "No field found for path: " + first.getPath().toDotString()).

long totalSize = chunks.stream().mapToLong(ColumnChunkMetaData::getTotalSize).sum();
Object globalMin =
chunks.stream()
.map(c -> convertStatsToInternalType(primitiveType, c.getStatistics().genericGetMin()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.getStatistics().genericGetMin() returns null for all-null columns (hasNonNullValue() == false). This would NPE inside convertStatsToInternalType. Consider filtering out chunks where stats have no non-null values before the min/max aggregation.

getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, basePath));
Stream<InternalDataFile> internalDataFiles =
getInternalDataFiles(getParquetFiles(hadoopConf, basePath), table.getReadSchema());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getParquetFiles() is called 3 times in getCurrentSnapshot() (lines 186, 188, 193), triggering 3 separate filesystem scans. This is inefficient and could yield inconsistent results if files change between calls. The comment on line 185 says "call the method twice" but it's actually 3 times now. Consider collecting to a list once and creating new streams from it?

import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: wildcard import org.apache.parquet.schema.*; — and the explicit import org.apache.parquet.schema.MessageType on line 49 is redundant since the wildcard already covers it. Expand the wildcard into explicit imports.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Extend the List of Stat Types to be Covered by the ParquetStatsConverterUtil class

2 participants