Skip to content

[SPARK-55628][SS] Integrate stream-stream join state format V4#54777

Open
nicholaschew11 wants to merge 6 commits intoapache:masterfrom
nicholaschew11:spark-55628-v4-join-integration
Open

[SPARK-55628][SS] Integrate stream-stream join state format V4#54777
nicholaschew11 wants to merge 6 commits intoapache:masterfrom
nicholaschew11:spark-55628-v4-join-integration

Conversation

@nicholaschew11
Copy link

@nicholaschew11 nicholaschew11 commented Mar 12, 2026

What changes were proposed in this pull request?

Integrate stream-stream join state format V4 which uses timestamp-based indexing with a secondary index.

Key changes:

  • Enable V4 in STREAMING_JOIN_STATE_FORMAT_VERSION config
  • Gated V4 behind spark.sql.streaming.join.stateFormatV4.enabled while V4 is under development.
  • Route V4 to use VCF (stateFormatVersion >= 3) and hardcode schema version 3 for VCF path
  • Fix checkpoint ID routing for V4's single-store design
  • Fix append() to extract event time from join key (window struct) before falling back to value row
  • Add StructType fallback in key event-time detection for windowed joins where only one side has watermark metadata
  • Mark V4's secondary index (TsWithKeyStore) as isInternal = true to prevent double-counting in numRowsTotal metrics
  • Convert watermark from milliseconds to microseconds at all 4 eviction call sites (V4 stores timestamps as TimestampType)
  • Add TimestampAsPostfixKeyStateEncoderSpec and TimestampAsPrefixKeyStateEncoderSpec to KeyStateEncoderSpec.fromJson for checkpoint restart deserialization
  • Add V4 branch in getSchemaForStateStores and getSchemasForStateStoreWithColFamily for correct column family schemas and encoder specs

Why are the changes needed?

SPARK-55628 tracks the integration of V4 state format into the stream-stream join operator. V4 was implemented in SPARK-55144 but not yet wired into the operator.

Does this PR introduce any user-facing change?

No. V4 is gated behind an internal config (spark.sql.streaming.join.stateFormatVersion=4, default remains 2). V4 is marked as experimental and subject to change.

How was this patch tested?

  • Added StreamingJoinV4Suite.scala with 4 new test suites: StreamingInnerJoinV4Suite, StreamingOuterJoinV4Suite, StreamingFullOuterJoinV4Suite, StreamingLeftSemiJoinV4Suite
  • All suites re-run existing join tests with V4 config via TestWithV4StateFormat trait
  • 2 V4-specific tests: plan assertion (verifies stateFormatVersion == 4 in execution plan) and schema validation (verifies correct column families and encoder specs)
  • 94/94 tests pass across all 4 suites

Was this patch authored or co-authored using generative AI tooling?

Yes

Behavioral Change Information

  • This is a behavioral change
  • This is not a behavioral change

@nicholaschew11
Copy link
Author

nicholaschew11 commented Mar 12, 2026

cc @HeartSaVioR

Copy link
Contributor

@eason-yuchen-liu eason-yuchen-liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change!
nit: Should we label this feature as experimental? IIUC, this state format is subject to change, and user should be using it in production.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 12, 2026

Yeah let's avoid making this as default and have an "internal" config as "opt-in" to use this at this point. We still need some more changes which might trigger the storage related changes. We should be the only user (to develop more) till we call it as done.

@nicholaschew11 Could you please incorporate this? Thanks!

@HeartSaVioR
Copy link
Contributor

I'm not sure speaking of "experimental" can justify that we break the compatibility over time. We should just be more aggressive - block users to use it and keep it open only to dev purpose. I hope we complete the work in Spark 4.2 time frame which removes the concern, but let's prepare the case of what if it doesn't happen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants