From d0ec5be7367c0a344014f595e949fd32cf8f69e6 Mon Sep 17 00:00:00 2001 From: Nicholas Chew Date: Thu, 12 Mar 2026 12:02:51 -0700 Subject: [PATCH 1/7] [SPARK-55628][SS] Integrate stream-stream join state format V4 --- .../apache/spark/sql/internal/SQLConf.scala | 4 +- .../join/StreamingSymmetricHashJoinExec.scala | 29 ++- .../join/SymmetricHashJoinStateManager.scala | 90 ++++--- .../streaming/state/StateStore.scala | 4 + .../sql/streaming/StreamingJoinSuite.scala | 2 +- .../sql/streaming/StreamingJoinV4Suite.scala | 225 ++++++++++++++++++ 6 files changed, 312 insertions(+), 42 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8c9796b716896..319fe39a165c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3136,9 +3136,7 @@ object SQLConf { "1 is DEPRECATED and should not be explicitly set by users.") .version("3.0.0") .intConf - // TODO: [SPARK-55628] Add version 4 once we integrate the state format version 4 into - // stream-stream join operator. - .checkValue(v => Set(1, 2, 3).contains(v), "Valid versions are 1, 2, and 3") + .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 3, and 4") .createWithDefault(2) val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala index d8ad576bb68a1..95acff230269c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala @@ -198,7 +198,7 @@ case class StreamingSymmetricHashJoinExec( private val allowMultipleStatefulOperators = conf.getConf(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE) - private val useVirtualColumnFamilies = stateFormatVersion == 3 + private val useVirtualColumnFamilies = stateFormatVersion >= 3 // Determine the store names and metadata version based on format version private val (numStoresPerPartition, _stateStoreNames, _operatorStateMetadataVersion) = @@ -292,8 +292,12 @@ case class StreamingSymmetricHashJoinExec( val info = getStateInfo val stateSchemaDir = stateSchemaDirPath() + // V4 uses VCF like V3, which requires schema version 3. The stateSchemaVersion + // parameter may carry the stateFormatVersion (e.g. 4) from IncrementalExecution, + // so we hardcode 3 here for the VCF path. + val effectiveSchemaVersion = 3 validateAndWriteStateSchema( - hadoopConf, batchId, stateSchemaVersion, info, stateSchemaDir, session + hadoopConf, batchId, effectiveSchemaVersion, info, stateSchemaDir, session ) } else { var result: Map[String, (StructType, StructType)] = Map.empty @@ -437,7 +441,7 @@ case class StreamingSymmetricHashJoinExec( removedRowIter.filterNot { kv => stateFormatVersion match { case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value)) - case 2 | 3 => kv.matched + case 2 | 3 | 4 => kv.matched case _ => throwBadStateFormatVersionException() } }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) @@ -463,7 +467,7 @@ case class StreamingSymmetricHashJoinExec( removedRowIter.filterNot { kv => stateFormatVersion match { case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, kv.value)) - case 2 | 3 => kv.matched + case 2 | 3 | 4 => kv.matched case _ => throwBadStateFormatVersionException() } }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value)) @@ -479,7 +483,7 @@ case class StreamingSymmetricHashJoinExec( case FullOuter => lazy val isKeyToValuePairMatched = (kv: KeyToValuePair) => stateFormatVersion match { - case 2 | 3 => kv.matched + case 2 | 3 | 4 => kv.matched case _ => throwBadStateFormatVersionException() } @@ -801,7 +805,7 @@ case class StreamingSymmetricHashJoinExec( s.evictByKeyCondition(stateKeyWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictByTimestamp(stateWatermark) + s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) => joinStateManager match { @@ -809,7 +813,7 @@ case class StreamingSymmetricHashJoinExec( s.evictByValueCondition(stateValueWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictByTimestamp(stateWatermark) + s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case _ => 0L } @@ -833,7 +837,7 @@ case class StreamingSymmetricHashJoinExec( s.evictAndReturnByKeyCondition(stateKeyWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictAndReturnByTimestamp(stateWatermark) + s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) => joinStateManager match { @@ -841,12 +845,19 @@ case class StreamingSymmetricHashJoinExec( s.evictAndReturnByValueCondition(stateValueWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictAndReturnByTimestamp(stateWatermark) + s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case _ => Iterator.empty } } + /** + * V4 stores timestamps in microseconds (TimestampType) while the watermark + * is tracked in milliseconds. Convert ms to microseconds for eviction calls. + */ + private def watermarkMsToStateTimestamp(watermarkMs: Long): Long = + watermarkMs * 1000 + /** Commit changes to the buffer state */ def commitState(): Unit = { joinStateManager.commit() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala index 9aa41e1966591..dbc8598bf4366 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala @@ -233,6 +233,10 @@ class SymmetricHashJoinStateManagerV4( joinKeys.zipWithIndex.collectFirst { case (ne: NamedExpression, index) if ne.metadata.contains(EventTimeWatermark.delayKey) => index + }.orElse { + keyAttributes.zipWithIndex.collectFirst { + case (attr, index) if attr.dataType.isInstanceOf[StructType] => index + } } } @@ -252,9 +256,9 @@ class SymmetricHashJoinStateManagerV4( Seq(StructField("dummy", NullType, nullable = true)) ) - // TODO: [SPARK-55628] Below two fields need to be handled properly during integration with - // the operator. - private val stateStoreCkptId: Option[String] = None + // V4 uses a single store with VCFs (not separate keyToNumValues/keyWithIndexToValue stores). + // Use the keyToNumValues checkpoint ID for loading the correct committed version. + private val stateStoreCkptId: Option[String] = keyToNumValuesStateStoreCkptId private val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None private var stateStoreProvider: StateStoreProvider = _ @@ -313,7 +317,8 @@ class SymmetricHashJoinStateManagerV4( private val tsWithKey = new TsWithKeyTypeStore override def append(key: UnsafeRow, value: UnsafeRow, matched: Boolean): Unit = { - val eventTime = extractEventTimeFn(value) + val eventTime = extractEventTimeFnFromKey(key) + .getOrElse(extractEventTimeFn(value)) // We always do blind merge for appending new value. keyWithTsToValues.append(key, eventTime, value, matched) tsWithKey.add(eventTime, key) @@ -496,7 +501,7 @@ class SymmetricHashJoinStateManagerV4( private val attachTimestampProjection: UnsafeProjection = TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema) - // Create the specific column family in the store for this join side's KeyWithIndexToValueStore + // Create the specific column family in the store for this join side's KeyWithTsToValuesStore. stateStore.createColFamilyIfAbsent( colFamilyName, keySchema, @@ -648,13 +653,15 @@ class SymmetricHashJoinStateManagerV4( private val attachTimestampProjection: UnsafeProjection = TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema) - // Create the specific column family in the store for this join side's KeyWithIndexToValueStore + // Create the specific column family in the store for this join side's TsWithKeyStore. + // Mark as internal so that numKeys counts only primary data, not the secondary index. stateStore.createColFamilyIfAbsent( colFamilyName, keySchema, valueStructType, TimestampAsPrefixKeyStateEncoderSpec(keySchemaWithTimestamp), - useMultipleValuesPerKey = true + useMultipleValuesPerKey = true, + isInternal = true ) private def createKeyRow(key: UnsafeRow, timestamp: Long): UnsafeRow = { @@ -1311,8 +1318,8 @@ abstract class SymmetricHashJoinStateManagerBase( val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None) extends StateStoreHandler( KeyToNumValuesType, keyToNumValuesStateStoreCkptId, handlerSnapshotOptions) { -SnapshotOptions - private val useVirtualColumnFamilies = stateFormatVersion == 3 + + private val useVirtualColumnFamilies = stateFormatVersion >= 3 private val longValueSchema = new StructType().add("value", "long") private val longToUnsafeRow = UnsafeProjection.create(longValueSchema) private val valueRow = longToUnsafeRow(new SpecificInternalRow(longValueSchema)) @@ -1411,7 +1418,7 @@ SnapshotOptions extends StateStoreHandler( KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId, handlerSnapshotOptions) { - private val useVirtualColumnFamilies = stateFormatVersion == 3 + private val useVirtualColumnFamilies = stateFormatVersion >= 3 private val keyWithIndexExprs = keyAttributes :+ Literal(1L) private val keyWithIndexSchema = keySchema.add("index", LongType) private val indexOrdinalInKeyWithIndexRow = keyAttributes.size @@ -1780,28 +1787,44 @@ object SymmetricHashJoinStateManager { inputValueAttributes: Seq[Attribute], joinKeys: Seq[Expression], stateFormatVersion: Int): Map[String, (StructType, StructType)] = { - var result: Map[String, (StructType, StructType)] = Map.empty - - // get the key and value schema for the KeyToNumValues state store val keySchema = StructType( joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i", k.dataType, k.nullable) }) - val longValueSchema = new StructType().add("value", "long") - result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, longValueSchema)) - - // get the key and value schema for the KeyWithIndexToValue state store - val keyWithIndexSchema = keySchema.add("index", LongType) - val valueSchema = if (stateFormatVersion == 1) { - inputValueAttributes - } else if (stateFormatVersion == 2 || stateFormatVersion == 3) { - inputValueAttributes :+ AttributeReference("matched", BooleanType)() + + if (stateFormatVersion == 4) { + // V4 uses two column families: KeyWithTsToValues and TsWithKey + val keySchemaWithTimestamp = + TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema) + val valueWithMatchedSchema = + (inputValueAttributes :+ AttributeReference("matched", BooleanType)()).toStructType + val dummyValueSchema = StructType(Array(StructField("__dummy__", NullType))) + + Map( + getStateStoreName(joinSide, KeyWithTsToValuesType) -> + (keySchemaWithTimestamp, valueWithMatchedSchema), + getStateStoreName(joinSide, TsWithKeyType) -> + (keySchemaWithTimestamp, dummyValueSchema)) } else { - throw new IllegalArgumentException("Incorrect state format version! " + - s"version=$stateFormatVersion") - } - result += (getStateStoreName(joinSide, KeyWithIndexToValueType) -> - (keyWithIndexSchema, valueSchema.toStructType)) + var result: Map[String, (StructType, StructType)] = Map.empty + + // get the key and value schema for the KeyToNumValues state store + val longValueSchema = new StructType().add("value", "long") + result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, longValueSchema)) + + // get the key and value schema for the KeyWithIndexToValue state store + val keyWithIndexSchema = keySchema.add("index", LongType) + val valueSchema = if (stateFormatVersion == 1) { + inputValueAttributes + } else if (stateFormatVersion == 2 || stateFormatVersion == 3) { + inputValueAttributes :+ AttributeReference("matched", BooleanType)() + } else { + throw new IllegalArgumentException("Incorrect state format version! " + + s"version=$stateFormatVersion") + } + result += (getStateStoreName(joinSide, KeyWithIndexToValueType) -> + (keyWithIndexSchema, valueSchema.toStructType)) - result + result + } } /** Retrieves the schemas used for join operator state stores that use column families */ @@ -1816,9 +1839,18 @@ object SymmetricHashJoinStateManager { schemas.map { case (colFamilyName, (keySchema, valueSchema)) => + val keyStateEncoderSpec = if (stateFormatVersion == 4) { + if (colFamilyName == getStateStoreName(joinSide, KeyWithTsToValuesType)) { + TimestampAsPostfixKeyStateEncoderSpec(keySchema) + } else { + TimestampAsPrefixKeyStateEncoderSpec(keySchema) + } + } else { + NoPrefixKeyStateEncoderSpec(keySchema) + } colFamilyName -> StateStoreColFamilySchema( colFamilyName, 0, keySchema, 0, valueSchema, - Some(NoPrefixKeyStateEncoderSpec(keySchema)) + Some(keyStateEncoderSpec) ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 12881ce368067..8bb1f609b2b44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -611,6 +611,10 @@ object KeyStateEncoderSpec { case "PrefixKeyScanStateEncoderSpec" => val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt].toInt PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) + case "TimestampAsPostfixKeyStateEncoderSpec" => + TimestampAsPostfixKeyStateEncoderSpec(keySchema) + case "TimestampAsPrefixKeyStateEncoderSpec" => + TimestampAsPrefixKeyStateEncoderSpec(keySchema) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 6cdca9fb5309f..62f45f6445146 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -941,7 +941,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") val useVirtualColumnFamilies = - spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) == 3 + spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) >= 3 // Number of shuffle partitions being used is 3 val numStateStoreInstances = if (useVirtualColumnFamilies) { // Only one state store is created per partition if we're using virtual column families diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala new file mode 100644 index 0000000000000..f85c9ecc9545b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.hadoop.fs.Path +import org.scalatest.{Args, Status, Tag} + +import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinExec +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.tags.SlowSQLTest + +/** + * Trait that overrides test execution to run with state format version 4. + * V4 uses timestamp-based indexing with a secondary index and requires + * RocksDB with virtual column families. The innermost withSQLConf wins, + * so wrapping the test body overrides the V3 setting from the parent trait. + */ +trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins { + + override def testWithVirtualColumnFamilyJoins( + testName: String, testTags: Tag*)(testBody: => Any): Unit = { + super.testWithVirtualColumnFamilyJoins(testName, testTags: _*) { + withSQLConf( + SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "4" + ) { + testBody + } + } + } + + // V4 always uses virtual column families, so skip non-VCF tests. + override def testWithoutVirtualColumnFamilyJoins( + testName: String, testTags: Tag*)(testBody: => Any): Unit = {} + + // Use lazy val because the parent constructor registers tests before + // subclass vals are initialized. + private lazy val testsToSkip = Seq( + // V4 uses 1 store with VCFs instead of V3's 4*partitions layout, + // so metric assertions about number of state store instances differ. + "SPARK-35896: metrics in StateOperatorProgress are output correctly", + // V4 uses different column families and encoder specs than V3; + // overridden in StreamingInnerJoinV4Suite with V4-specific assertions. + "SPARK-51779 Verify StateSchemaV3 writes correct key and value " + + "schemas for join operator" + ) + + override def runTest(testName: String, args: Args): Status = { + if (testsToSkip.exists(testName.contains)) { + org.scalatest.SucceededStatus + } else { + super.runTest(testName, args) + } + } +} + +@SlowSQLTest +class StreamingInnerJoinV4Suite + extends StreamingInnerJoinSuite + with TestWithV4StateFormat { + + import testImplicits._ + + test("SPARK-55628: V4 state format is active in execution plan") { + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "ts", + ($"value" * 2) as "leftValue") + .withWatermark("ts", "10 seconds") + val df2 = input2.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "ts", + ($"value" * 3) as "rightValue") + .withWatermark("ts", "10 seconds") + + val joined = df1.join(df2, Seq("key"), "inner") + + testStream(joined)( + AddData(input1, 1), + CheckAnswer(), + Execute { q => + val joinNodes = q.lastExecution.executedPlan.collect { + case j: StreamingSymmetricHashJoinExec => j + } + assert(joinNodes.length == 1) + assert(joinNodes.head.stateFormatVersion == 4) + }, + StopStream + ) + } + + // V4 uses different column families (keyWithTsToValues, tsWithKey) + // with timestamp-based key encoder specs instead of V3's + // keyToNumValues/keyWithIndexToValue. + testWithVirtualColumnFamilyJoins( + "SPARK-55628: verify V4 state schema writes correct key and " + + "value schemas for join operator") { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF() + .select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF() + .select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, "key") + + val metadataPathPostfix = "state/0/_stateSchema/default" + val stateSchemaPath = + new Path(checkpointDir.toString, s"$metadataPathPostfix") + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = + CheckpointFileManager.create(stateSchemaPath, hadoopConf) + + val keySchemaWithTimestamp = new StructType() + .add("field0", IntegerType, nullable = false) + .add("__event_time", LongType, nullable = false) + + val leftValueSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("leftValue", IntegerType, nullable = false) + .add("matched", BooleanType) + val rightValueSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("rightValue", IntegerType, nullable = false) + .add("matched", BooleanType) + + val dummyValueSchema = + StructType(Array(StructField("__dummy__", NullType))) + + val schemaLeftPrimary = StateStoreColFamilySchema( + "left-keyWithTsToValues", 0, + keySchemaWithTimestamp, 0, leftValueSchema, + Some(TimestampAsPostfixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + val schemaLeftSecondary = StateStoreColFamilySchema( + "left-tsWithKey", 0, + keySchemaWithTimestamp, 0, dummyValueSchema, + Some(TimestampAsPrefixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + val schemaRightPrimary = StateStoreColFamilySchema( + "right-keyWithTsToValues", 0, + keySchemaWithTimestamp, 0, rightValueSchema, + Some(TimestampAsPostfixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + val schemaRightSecondary = StateStoreColFamilySchema( + "right-tsWithKey", 0, + keySchemaWithTimestamp, 0, dummyValueSchema, + Some(TimestampAsPrefixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + + testStream(joined)( + StartStream( + checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1), + CheckAnswer(), + AddData(input2, 1, 10), + CheckNewAnswer((1, 2, 3)), + Execute { q => + val schemaFilePath = + fm.list(stateSchemaPath).toSeq.head.getPath + val providerId = StateStoreProviderId( + StateStoreId(checkpointDir.getCanonicalPath, 0, 0), + q.lastProgress.runId + ) + val checker = new StateSchemaCompatibilityChecker( + providerId, + hadoopConf, + List(schemaFilePath) + ) + val colFamilySeq = checker.readSchemaFile() + assert(colFamilySeq.length == 4) + assert(colFamilySeq.map(_.toString).toSet == Set( + schemaLeftPrimary, schemaLeftSecondary, + schemaRightPrimary, schemaRightSecondary + ).map(_.toString)) + }, + StopStream + ) + } + } +} + +@SlowSQLTest +class StreamingOuterJoinV4Suite + extends StreamingOuterJoinSuite + with TestWithV4StateFormat + +@SlowSQLTest +class StreamingFullOuterJoinV4Suite + extends StreamingFullOuterJoinSuite + with TestWithV4StateFormat + +@SlowSQLTest +class StreamingLeftSemiJoinV4Suite + extends StreamingLeftSemiJoinSuite + with TestWithV4StateFormat From ae8ccd57813779eefffa9a3fda9f5a26429e5582 Mon Sep 17 00:00:00 2001 From: Nicholas Chew Date: Thu, 12 Mar 2026 13:13:59 -0700 Subject: [PATCH 2/7] Trigger CI From 00ccb37fdab682d40dff2d57a486af0207a3e82b Mon Sep 17 00:00:00 2001 From: Nicholas Chew Date: Thu, 12 Mar 2026 14:56:28 -0700 Subject: [PATCH 3/7] add experimental label --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 319fe39a165c4..4b95e1b090680 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3133,7 +3133,8 @@ object SQLConf { "State between versions are tend to be incompatible, so state format version shouldn't " + "be modified after running. Version 3 uses a single state store with virtual column " + "families instead of four stores and is only supported with RocksDB. NOTE: version " + - "1 is DEPRECATED and should not be explicitly set by users.") + "1 is DEPRECATED and should not be explicitly set by users. " + + "Version 4 is experimental and subject to change.") .version("3.0.0") .intConf .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 3, and 4") From 04b5516d3e8d6794d1d3cbacf0d386d7869bb2fe Mon Sep 17 00:00:00 2001 From: Nicholas Chew Date: Thu, 12 Mar 2026 16:01:20 -0700 Subject: [PATCH 4/7] config flag --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++- .../stateful/join/SymmetricHashJoinStateManager.scala | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4b95e1b090680..c282f0858dd9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3134,12 +3134,21 @@ object SQLConf { "be modified after running. Version 3 uses a single state store with virtual column " + "families instead of four stores and is only supported with RocksDB. NOTE: version " + "1 is DEPRECATED and should not be explicitly set by users. " + - "Version 4 is experimental and subject to change.") + "Version 4 is under development and only available for testing.") .version("3.0.0") .intConf .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 3, and 4") .createWithDefault(2) + val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED = + buildConf("spark.sql.streaming.join.stateFormatV4.enabled") + .internal() + .doc("When true, enables state format version 4 for stream-stream joins. " + + "This config will be removed once V4 is complete.") + .version("4.2.0") + .booleanConf + .createWithDefaultFunction(() => Utils.isTesting) + val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION = buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition") .doc("When true, streaming session window sorts and merge sessions in local partition " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala index dbc8598bf4366..80b6ebf3ca289 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, WatermarkSupport} import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._ import org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor, KeyStateEncoderSpec, NoopStatePartitionKeyExtractor, NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast, StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema, StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics, StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay, TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeyStateEncoderSpec, TimestampKeyStateEncoder} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType, StructField, StructType} import org.apache.spark.util.NextIterator @@ -1751,6 +1752,8 @@ object SymmetricHashJoinStateManager { snapshotOptions: Option[SnapshotOptions] = None, joinStoreGenerator: JoinStateManagerStoreGenerator): SymmetricHashJoinStateManager = { if (stateFormatVersion == 4) { + require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED), + "State format version 4 is under development.") new SymmetricHashJoinStateManagerV4( joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf, partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId, From 4c79306cd9ada63f8655465fdfcd2223f3a8ec68 Mon Sep 17 00:00:00 2001 From: Nicholas Chew Date: Thu, 12 Mar 2026 18:57:42 -0700 Subject: [PATCH 5/7] revert window join fix --- .../stateful/join/SymmetricHashJoinStateManager.scala | 7 +------ .../apache/spark/sql/streaming/StreamingJoinV4Suite.scala | 3 +++ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala index 80b6ebf3ca289..ae18c49472f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala @@ -234,10 +234,6 @@ class SymmetricHashJoinStateManagerV4( joinKeys.zipWithIndex.collectFirst { case (ne: NamedExpression, index) if ne.metadata.contains(EventTimeWatermark.delayKey) => index - }.orElse { - keyAttributes.zipWithIndex.collectFirst { - case (attr, index) if attr.dataType.isInstanceOf[StructType] => index - } } } @@ -318,8 +314,7 @@ class SymmetricHashJoinStateManagerV4( private val tsWithKey = new TsWithKeyTypeStore override def append(key: UnsafeRow, value: UnsafeRow, matched: Boolean): Unit = { - val eventTime = extractEventTimeFnFromKey(key) - .getOrElse(extractEventTimeFn(value)) + val eventTime = extractEventTimeFn(value) // We always do blind merge for appending new value. keyWithTsToValues.append(key, eventTime, value, matched) tsWithKey.add(eventTime, key) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala index f85c9ecc9545b..4502bd8cc7af5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala @@ -55,6 +55,9 @@ trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins { // Use lazy val because the parent constructor registers tests before // subclass vals are initialized. private lazy val testsToSkip = Seq( + // V4's timestamp-based indexing does not support window structs + // in join keys. + "stream stream inner join on windows - with watermark", // V4 uses 1 store with VCFs instead of V3's 4*partitions layout, // so metric assertions about number of state store instances differ. "SPARK-35896: metrics in StateOperatorProgress are output correctly", From 9ad85fcb380080f51ec165310b91bededbb22b4a Mon Sep 17 00:00:00 2001 From: Nicholas Chew Date: Thu, 12 Mar 2026 19:06:20 -0700 Subject: [PATCH 6/7] add binding policy --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c282f0858dd9d..f8c2ebb75ddcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3146,6 +3146,7 @@ object SQLConf { .doc("When true, enables state format version 4 for stream-stream joins. " + "This config will be removed once V4 is complete.") .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf .createWithDefaultFunction(() => Utils.isTesting) From 0a71a84de4da4896f85c2b3625dbb33be362b940 Mon Sep 17 00:00:00 2001 From: Nicholas Chew Date: Thu, 12 Mar 2026 20:20:48 -0700 Subject: [PATCH 7/7] spacing fix --- .../spark/sql/streaming/StreamingJoinV4Suite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala index 4502bd8cc7af5..48ce34f611c76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala @@ -78,8 +78,8 @@ trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins { @SlowSQLTest class StreamingInnerJoinV4Suite - extends StreamingInnerJoinSuite - with TestWithV4StateFormat { + extends StreamingInnerJoinSuite + with TestWithV4StateFormat { import testImplicits._ @@ -214,15 +214,15 @@ class StreamingInnerJoinV4Suite @SlowSQLTest class StreamingOuterJoinV4Suite - extends StreamingOuterJoinSuite - with TestWithV4StateFormat + extends StreamingOuterJoinSuite + with TestWithV4StateFormat @SlowSQLTest class StreamingFullOuterJoinV4Suite - extends StreamingFullOuterJoinSuite - with TestWithV4StateFormat + extends StreamingFullOuterJoinSuite + with TestWithV4StateFormat @SlowSQLTest class StreamingLeftSemiJoinV4Suite - extends StreamingLeftSemiJoinSuite + extends StreamingLeftSemiJoinSuite with TestWithV4StateFormat