[SPARK-55628][SS] Integrate stream-stream join state format V4#54777
[SPARK-55628][SS] Integrate stream-stream join state format V4#54777nicholaschew11 wants to merge 6 commits intoapache:masterfrom
Conversation
|
cc @HeartSaVioR |
eason-yuchen-liu
left a comment
There was a problem hiding this comment.
Thanks for the change!
nit: Should we label this feature as experimental? IIUC, this state format is subject to change, and user should be using it in production.
|
Yeah let's avoid making this as default and have an "internal" config as "opt-in" to use this at this point. We still need some more changes which might trigger the storage related changes. We should be the only user (to develop more) till we call it as done. @nicholaschew11 Could you please incorporate this? Thanks! |
|
I'm not sure speaking of "experimental" can justify that we break the compatibility over time. We should just be more aggressive - block users to use it and keep it open only to dev purpose. I hope we complete the work in Spark 4.2 time frame which removes the concern, but let's prepare the case of what if it doesn't happen. |
What changes were proposed in this pull request?
Integrate stream-stream join state format V4 which uses timestamp-based indexing with a secondary index.
Key changes:
STREAMING_JOIN_STATE_FORMAT_VERSIONconfigspark.sql.streaming.join.stateFormatV4.enabledwhile V4 is under development.stateFormatVersion >= 3) and hardcode schema version 3 for VCF pathappend()to extract event time from join key (window struct) before falling back to value rowTsWithKeyStore) asisInternal = trueto prevent double-counting innumRowsTotalmetricsTimestampType)TimestampAsPostfixKeyStateEncoderSpecandTimestampAsPrefixKeyStateEncoderSpectoKeyStateEncoderSpec.fromJsonfor checkpoint restart deserializationgetSchemaForStateStoresandgetSchemasForStateStoreWithColFamilyfor correct column family schemas and encoder specsWhy are the changes needed?
SPARK-55628 tracks the integration of V4 state format into the stream-stream join operator. V4 was implemented in SPARK-55144 but not yet wired into the operator.
Does this PR introduce any user-facing change?
No. V4 is gated behind an internal config (
spark.sql.streaming.join.stateFormatVersion=4, default remains 2). V4 is marked as experimental and subject to change.How was this patch tested?
StreamingJoinV4Suite.scalawith 4 new test suites:StreamingInnerJoinV4Suite,StreamingOuterJoinV4Suite,StreamingFullOuterJoinV4Suite,StreamingLeftSemiJoinV4SuiteTestWithV4StateFormattraitstateFormatVersion == 4in execution plan) and schema validation (verifies correct column families and encoder specs)Was this patch authored or co-authored using generative AI tooling?
Yes
Behavioral Change Information