[SPARK-55568][SQL] Separate schema construction from field stats collection#54343
[SPARK-55568][SQL] Separate schema construction from field stats collection#54343qlong wants to merge 3 commits intoapache:masterfrom
Conversation
|
@cloud-fan @cashmand Can you help review as you are the authors for the original implemenation? Thanks |
cashmand
left a comment
There was a problem hiding this comment.
Thanks, it looks good overall, but I posted a couple of concerns. I wonder if we can make the schemaRegistry a nested structure to avoid these problems, but still get the performance benefit you're seeing.
...n/scala/org/apache/spark/sql/execution/datasources/parquet/InferVariantShreddingSchema.scala
Outdated
Show resolved
Hide resolved
...n/scala/org/apache/spark/sql/execution/datasources/parquet/InferVariantShreddingSchema.scala
Outdated
Show resolved
Hide resolved
...n/scala/org/apache/spark/sql/execution/datasources/parquet/InferVariantShreddingSchema.scala
Outdated
Show resolved
Hide resolved
1ac24a8 to
e89782e
Compare
Thanks for the review. Switched to a tree to track field stats, get 1.3x to 1.5x improvement over flat map. Overall, the new implementation is 1.7x to 2.4x improvement. |
e89782e to
b0e9307
Compare
|
Hi @cashmand , I addressed your comments in the latest version. Can you review? Thanks |
cashmand
left a comment
There was a problem hiding this comment.
Thanks, just a few questions.
| inArrayContext: Boolean = false): DataType = { | ||
|
|
||
| // Check if this node represents an array (has "[]" child) | ||
| val arrayChild = currentNode.children.get("[]") |
There was a problem hiding this comment.
In a pathological case, can [] be used as a field name? If that happened, would we incorrectly build an array instead of a struct here? Is so, could we avoid this check by passing another bool to buildSchemaFromStats to indicate whether currentNode is an array?
There was a problem hiding this comment.
Thanks for review. Good call out on [] as the type marker, I agree it is bad. I added a dedicate arrayElementNode in FieldNode to track array so we no longer need to rely on any marker. Also added test cases for [] as field names.
| case Type.UUID => VariantType | ||
| // Node for tree-based field tracking | ||
| private case class FieldNode( | ||
| var dataType: DataType, |
There was a problem hiding this comment.
Can you comment on what dataType will be for structs and arrays?
There was a problem hiding this comment.
dataType is the type summary:
- For struct, it is
StructType(Seq.empty), actual schema come from children - For array, it is
ArrayType(NullType), actually schema comes from the new arrayElementNode - For primitives:
dataTypeis the merged scala type
| // Use "[]" as special child key for array elements | ||
| val arrayNode = currentNode.getOrCreateChild("[]") | ||
|
|
||
| // Track distinct row count for the array field itself |
There was a problem hiding this comment.
Why is this check for arrayNode.lastSeenRow != rowIdx needed? If we're not in an array context, what's the case where we'd see the same rowIdx twice?
There was a problem hiding this comment.
The check for arrayNode is needed for a row with nested array like [[1], [2]]. The outer array node will be visited multiple times while we iterate elements at line 429. The check is to prevent inflating rowCount.
|
|
||
| testWithTempDir("infer shredding key as data") { dir => | ||
| // The first 10 fields in each object include the row ID in the field name, so they'll be | ||
| // unique. Because we impose a 1000-field limit when building up the schema, we'll end up |
There was a problem hiding this comment.
Is the 1000 field limit no longer enforced at all? The intent of the limit was to avoid building up a huge intermediate schema if all of the variant values have distinct fields. I think the new approach could still result in a fairly large statistics tree in this situation, right?
I don't think this is necessarily critical - the overall memory and time should still be bounded by the size of the variants, which is enforced elsewhere to not get too large. But if we're going to remove this limit, I want to make sure it's a conscious decision.
There was a problem hiding this comment.
You are right that the tree can grow beyond 1000 fields during stats collection. I change it to ensure that the selected fields are the true top N by cardinality. The memory footprint of the stats tree is proportional to the number of unique fields, it should be much smaller than the varant data itself.
…ection Variant shredding schema inference is expensive and can take over 100ms per file. Replace fold-based schema merging with deferred schema construction using single-pass field statistics collection. Previous approach: - Used foldLeft to build and merge complete schemas for each row - Merged schemas repeatedly across 4096 rows - High allocation overhead from recursive schema construction New approach: - Separate schema construction from field statistics collection to avoid excessive intermediate allocations and repeated merges. - Single-pass field traversal with flat statistics registry to track field types and row counts - Using lastSeenRow for deduplication - Defers schema construction until after all rows processed Performance vs master: - Tested with scenarios with different field counts, array sizes, and batch sizes(1-4096 rows, 10-200 fields, varying nesting depths and sparsity patterns). - Average 1.5x speedup across test scenarios - 1.5x-1.6x faster on array-heavy workloads - 11.5x faster on sparse data (10% field presence) - Consistent performance across multiple runs - 96% of tests show improvement All existing unit tests pass. Issue: https://issues.apache.org/jira/browse/SPARK-55568
…ection Switch to tree structure for tracking field stats as suggested by @cashmand. Performance improvements - 1.3x to 1.5x faster compared to flat map - 1.7x to 2.4x faster compared to the original implementation Other changes: - Ensure top cardinality fields are included in the schema by sorting by cardinality first before taking the top N. - Add special character tests and adds tests for mixed special character, as suggested by @cashmand.
…ection Address PR feedback: - Make array-element tracking explicit with a dedicated arrayElementNode to avoid ambiguity with "[]" field names - Add tests for field names "[]"
e580e01 to
282bc8f
Compare
| case Type.UUID => VariantType | ||
| // Node for tree-based field tracking | ||
| private case class FieldNode( | ||
| var dataType: DataType, |
There was a problem hiding this comment.
dataType is the type summary:
- For struct, it is
StructType(Seq.empty), actual schema come from children - For array, it is
ArrayType(NullType), actually schema comes from the new arrayElementNode - For primitives:
dataTypeis the merged scala type
| // Use "[]" as special child key for array elements | ||
| val arrayNode = currentNode.getOrCreateChild("[]") | ||
|
|
||
| // Track distinct row count for the array field itself |
There was a problem hiding this comment.
The check for arrayNode is needed for a row with nested array like [[1], [2]]. The outer array node will be visited multiple times while we iterate elements at line 429. The check is to prevent inflating rowCount.
| var lastSeenRow: Int = -1, // Last row index that incremented rowCount | ||
| var arrayElementCount: Long = 0, // Total occurrences across all array elements | ||
| children: mutable.Map[String, FieldNode] = mutable.Map.empty, | ||
| var arrayElementNode: Option[FieldNode] = None |
There was a problem hiding this comment.
Added a field to track Array, instead of relying on [] marker .
|
|
||
| testWithTempDir("infer shredding key as data") { dir => | ||
| // The first 10 fields in each object include the row ID in the field name, so they'll be | ||
| // unique. Because we impose a 1000-field limit when building up the schema, we'll end up |
There was a problem hiding this comment.
You are right that the tree can grow beyond 1000 fields during stats collection. I change it to ensure that the selected fields are the true top N by cardinality. The memory footprint of the stats tree is proportional to the number of unique fields, it should be much smaller than the varant data itself.
| inArrayContext: Boolean = false): DataType = { | ||
|
|
||
| // Check if this node represents an array (has "[]" child) | ||
| val arrayChild = currentNode.children.get("[]") |
There was a problem hiding this comment.
Thanks for review. Good call out on [] as the type marker, I agree it is bad. I added a dedicate arrayElementNode in FieldNode to track array so we no longer need to rely on any marker. Also added test cases for [] as field names.
|
|
||
| // Get all direct children, filter by cardinality, sort by cardinality descending, | ||
| // take top N, then sort alphabetically for determinism. | ||
| val maxStructSize = Math.min(1000, maxShreddedFieldsPerFile) |
There was a problem hiding this comment.
this is to limit candidates per struct node; final file-level enforcement is in finalizeSimpleSchema.
Why are the changes needed?
Variant shredding schema inference is expensive and can take over 100ms per file. Replace fold-based schema merging with deferred schema construction using single-pass field statistics collection.
Previous approach:
New approach:
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Functional test:
Performance vs master:
Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude Sonnet 4.5