Skip to content

[SPARK-55568][SQL] Separate schema construction from field stats collection#54343

Open
qlong wants to merge 3 commits intoapache:masterfrom
qlong:SPARK-55568-optimize-variant-schema-inference
Open

[SPARK-55568][SQL] Separate schema construction from field stats collection#54343
qlong wants to merge 3 commits intoapache:masterfrom
qlong:SPARK-55568-optimize-variant-schema-inference

Conversation

@qlong
Copy link

@qlong qlong commented Feb 17, 2026

Why are the changes needed?

Variant shredding schema inference is expensive and can take over 100ms per file. Replace fold-based schema merging with deferred schema construction using single-pass field statistics collection.

Previous approach:

  • Used foldLeft to build and merge complete schemas for each row
  • Merged schemas repeatedly across 4096 rows
  • High allocation overhead from recursive schema construction

New approach:

  • Separate schema construction from field statistics collection to avoid excessive intermediate allocations and repeated merges.
  • Single-pass field traversal with field statistics tree to track field types and row counts
  • Using lastSeenRow for deduplication
  • Defers schema construction until after all rows processed

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Functional test:

  • Pass all existing unit tests

Performance vs master:

  • Tested with scenarios with different field counts, array sizes, and batch sizes(1-4096 rows, 10-200 fields, varying nesting depths and sparsity patterns).
  • 1.7x to 2.4x speed up across test scenarios
  • Consistent performance across multiple runs
  • 96% of tests show improvement

Was this patch authored or co-authored using generative AI tooling?

Co-authored with Claude Sonnet 4.5

@qlong
Copy link
Author

qlong commented Feb 25, 2026

@cloud-fan @cashmand Can you help review as you are the authors for the original implemenation? Thanks

Copy link
Contributor

@cashmand cashmand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it looks good overall, but I posted a couple of concerns. I wonder if we can make the schemaRegistry a nested structure to avoid these problems, but still get the performance benefit you're seeing.

@qlong qlong force-pushed the SPARK-55568-optimize-variant-schema-inference branch from 1ac24a8 to e89782e Compare February 26, 2026 16:42
@qlong
Copy link
Author

qlong commented Feb 26, 2026

Thanks, it looks good overall, but I posted a couple of concerns. I wonder if we can make the schemaRegistry a nested structure to avoid these problems, but still get the performance benefit you're seeing.

Thanks for the review. Switched to a tree to track field stats, get 1.3x to 1.5x improvement over flat map. Overall, the new implementation is 1.7x to 2.4x improvement.

@qlong qlong force-pushed the SPARK-55568-optimize-variant-schema-inference branch from e89782e to b0e9307 Compare February 26, 2026 20:21
@qlong qlong requested a review from cashmand February 26, 2026 22:16
@qlong
Copy link
Author

qlong commented Mar 6, 2026

Hi @cashmand , I addressed your comments in the latest version. Can you review? Thanks

Copy link
Contributor

@cashmand cashmand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, just a few questions.

inArrayContext: Boolean = false): DataType = {

// Check if this node represents an array (has "[]" child)
val arrayChild = currentNode.children.get("[]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a pathological case, can [] be used as a field name? If that happened, would we incorrectly build an array instead of a struct here? Is so, could we avoid this check by passing another bool to buildSchemaFromStats to indicate whether currentNode is an array?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. Good call out on [] as the type marker, I agree it is bad. I added a dedicate arrayElementNode in FieldNode to track array so we no longer need to rely on any marker. Also added test cases for [] as field names.

case Type.UUID => VariantType
// Node for tree-based field tracking
private case class FieldNode(
var dataType: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what dataType will be for structs and arrays?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataType is the type summary:

  • For struct, it is StructType(Seq.empty), actual schema come from children
  • For array, it is ArrayType(NullType), actually schema comes from the new arrayElementNode
  • For primitives: dataType is the merged scala type

// Use "[]" as special child key for array elements
val arrayNode = currentNode.getOrCreateChild("[]")

// Track distinct row count for the array field itself
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this check for arrayNode.lastSeenRow != rowIdx needed? If we're not in an array context, what's the case where we'd see the same rowIdx twice?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for arrayNode is needed for a row with nested array like [[1], [2]]. The outer array node will be visited multiple times while we iterate elements at line 429. The check is to prevent inflating rowCount.


testWithTempDir("infer shredding key as data") { dir =>
// The first 10 fields in each object include the row ID in the field name, so they'll be
// unique. Because we impose a 1000-field limit when building up the schema, we'll end up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the 1000 field limit no longer enforced at all? The intent of the limit was to avoid building up a huge intermediate schema if all of the variant values have distinct fields. I think the new approach could still result in a fairly large statistics tree in this situation, right?

I don't think this is necessarily critical - the overall memory and time should still be bounded by the size of the variants, which is enforced elsewhere to not get too large. But if we're going to remove this limit, I want to make sure it's a conscious decision.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that the tree can grow beyond 1000 fields during stats collection. I change it to ensure that the selected fields are the true top N by cardinality. The memory footprint of the stats tree is proportional to the number of unique fields, it should be much smaller than the varant data itself.

qlong added 3 commits March 12, 2026 15:21
…ection

Variant shredding schema inference is expensive and can take over 100ms
per file. Replace fold-based schema merging with deferred schema
construction using single-pass field statistics collection.

Previous approach:
- Used foldLeft to build and merge complete schemas for each row
- Merged schemas repeatedly across 4096 rows
- High allocation overhead from recursive schema construction

New approach:
- Separate schema construction from field statistics collection to avoid
  excessive intermediate allocations and repeated merges.
- Single-pass field traversal with flat statistics registry to track
  field types and row counts
- Using lastSeenRow for deduplication
- Defers schema construction until after all rows processed

Performance vs master:
- Tested with scenarios with different field counts, array sizes, and
  batch sizes(1-4096 rows, 10-200 fields, varying nesting depths and
  sparsity patterns).
- Average 1.5x speedup across test scenarios
- 1.5x-1.6x faster on array-heavy workloads
- 11.5x faster on sparse data (10% field presence)
- Consistent performance across multiple runs
- 96% of tests show improvement

All existing unit tests pass.

Issue: https://issues.apache.org/jira/browse/SPARK-55568
…ection

Switch to tree structure for tracking field stats as suggested by
@cashmand.

Performance improvements
- 1.3x to 1.5x faster compared to flat map
- 1.7x to 2.4x faster compared to the original implementation

Other changes:
- Ensure top cardinality fields are included in the schema by sorting
  by cardinality first before taking the top N.
- Add special character tests and adds tests for mixed
  special character, as suggested by @cashmand.
…ection

Address PR feedback:
- Make array-element tracking explicit with a dedicated arrayElementNode
to avoid ambiguity with  "[]" field names
- Add tests for field names "[]"
@qlong qlong force-pushed the SPARK-55568-optimize-variant-schema-inference branch from e580e01 to 282bc8f Compare March 12, 2026 19:21
case Type.UUID => VariantType
// Node for tree-based field tracking
private case class FieldNode(
var dataType: DataType,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataType is the type summary:

  • For struct, it is StructType(Seq.empty), actual schema come from children
  • For array, it is ArrayType(NullType), actually schema comes from the new arrayElementNode
  • For primitives: dataType is the merged scala type

// Use "[]" as special child key for array elements
val arrayNode = currentNode.getOrCreateChild("[]")

// Track distinct row count for the array field itself
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for arrayNode is needed for a row with nested array like [[1], [2]]. The outer array node will be visited multiple times while we iterate elements at line 429. The check is to prevent inflating rowCount.

var lastSeenRow: Int = -1, // Last row index that incremented rowCount
var arrayElementCount: Long = 0, // Total occurrences across all array elements
children: mutable.Map[String, FieldNode] = mutable.Map.empty,
var arrayElementNode: Option[FieldNode] = None
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a field to track Array, instead of relying on [] marker .


testWithTempDir("infer shredding key as data") { dir =>
// The first 10 fields in each object include the row ID in the field name, so they'll be
// unique. Because we impose a 1000-field limit when building up the schema, we'll end up
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that the tree can grow beyond 1000 fields during stats collection. I change it to ensure that the selected fields are the true top N by cardinality. The memory footprint of the stats tree is proportional to the number of unique fields, it should be much smaller than the varant data itself.

inArrayContext: Boolean = false): DataType = {

// Check if this node represents an array (has "[]" child)
val arrayChild = currentNode.children.get("[]")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. Good call out on [] as the type marker, I agree it is bad. I added a dedicate arrayElementNode in FieldNode to track array so we no longer need to rely on any marker. Also added test cases for [] as field names.


// Get all direct children, filter by cardinality, sort by cardinality descending,
// take top N, then sort alphabetically for determinism.
val maxStructSize = Math.min(1000, maxShreddedFieldsPerFile)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to limit candidates per struct node; final file-level enforcement is in finalizeSimpleSchema.

@qlong qlong requested a review from cashmand March 13, 2026 01:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants