diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8c9796b716896..f8c2ebb75ddcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3133,14 +3133,23 @@ object SQLConf { "State between versions are tend to be incompatible, so state format version shouldn't " + "be modified after running. Version 3 uses a single state store with virtual column " + "families instead of four stores and is only supported with RocksDB. NOTE: version " + - "1 is DEPRECATED and should not be explicitly set by users.") + "1 is DEPRECATED and should not be explicitly set by users. " + + "Version 4 is under development and only available for testing.") .version("3.0.0") .intConf - // TODO: [SPARK-55628] Add version 4 once we integrate the state format version 4 into - // stream-stream join operator. - .checkValue(v => Set(1, 2, 3).contains(v), "Valid versions are 1, 2, and 3") + .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 3, and 4") .createWithDefault(2) + val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED = + buildConf("spark.sql.streaming.join.stateFormatV4.enabled") + .internal() + .doc("When true, enables state format version 4 for stream-stream joins. " + + "This config will be removed once V4 is complete.") + .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefaultFunction(() => Utils.isTesting) + val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION = buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition") .doc("When true, streaming session window sorts and merge sessions in local partition " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala index d8ad576bb68a1..95acff230269c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala @@ -198,7 +198,7 @@ case class StreamingSymmetricHashJoinExec( private val allowMultipleStatefulOperators = conf.getConf(SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE) - private val useVirtualColumnFamilies = stateFormatVersion == 3 + private val useVirtualColumnFamilies = stateFormatVersion >= 3 // Determine the store names and metadata version based on format version private val (numStoresPerPartition, _stateStoreNames, _operatorStateMetadataVersion) = @@ -292,8 +292,12 @@ case class StreamingSymmetricHashJoinExec( val info = getStateInfo val stateSchemaDir = stateSchemaDirPath() + // V4 uses VCF like V3, which requires schema version 3. The stateSchemaVersion + // parameter may carry the stateFormatVersion (e.g. 4) from IncrementalExecution, + // so we hardcode 3 here for the VCF path. + val effectiveSchemaVersion = 3 validateAndWriteStateSchema( - hadoopConf, batchId, stateSchemaVersion, info, stateSchemaDir, session + hadoopConf, batchId, effectiveSchemaVersion, info, stateSchemaDir, session ) } else { var result: Map[String, (StructType, StructType)] = Map.empty @@ -437,7 +441,7 @@ case class StreamingSymmetricHashJoinExec( removedRowIter.filterNot { kv => stateFormatVersion match { case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value)) - case 2 | 3 => kv.matched + case 2 | 3 | 4 => kv.matched case _ => throwBadStateFormatVersionException() } }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) @@ -463,7 +467,7 @@ case class StreamingSymmetricHashJoinExec( removedRowIter.filterNot { kv => stateFormatVersion match { case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, kv.value)) - case 2 | 3 => kv.matched + case 2 | 3 | 4 => kv.matched case _ => throwBadStateFormatVersionException() } }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value)) @@ -479,7 +483,7 @@ case class StreamingSymmetricHashJoinExec( case FullOuter => lazy val isKeyToValuePairMatched = (kv: KeyToValuePair) => stateFormatVersion match { - case 2 | 3 => kv.matched + case 2 | 3 | 4 => kv.matched case _ => throwBadStateFormatVersionException() } @@ -801,7 +805,7 @@ case class StreamingSymmetricHashJoinExec( s.evictByKeyCondition(stateKeyWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictByTimestamp(stateWatermark) + s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) => joinStateManager match { @@ -809,7 +813,7 @@ case class StreamingSymmetricHashJoinExec( s.evictByValueCondition(stateValueWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictByTimestamp(stateWatermark) + s.evictByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case _ => 0L } @@ -833,7 +837,7 @@ case class StreamingSymmetricHashJoinExec( s.evictAndReturnByKeyCondition(stateKeyWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictAndReturnByTimestamp(stateWatermark) + s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case Some(JoinStateValueWatermarkPredicate(_, stateWatermark)) => joinStateManager match { @@ -841,12 +845,19 @@ case class StreamingSymmetricHashJoinExec( s.evictAndReturnByValueCondition(stateValueWatermarkPredicateFunc) case s: SupportsEvictByTimestamp => - s.evictAndReturnByTimestamp(stateWatermark) + s.evictAndReturnByTimestamp(watermarkMsToStateTimestamp(stateWatermark)) } case _ => Iterator.empty } } + /** + * V4 stores timestamps in microseconds (TimestampType) while the watermark + * is tracked in milliseconds. Convert ms to microseconds for eviction calls. + */ + private def watermarkMsToStateTimestamp(watermarkMs: Long): Long = + watermarkMs * 1000 + /** Commit changes to the buffer state */ def commitState(): Unit = { joinStateManager.commit() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala index 9aa41e1966591..ae18c49472f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, WatermarkSupport} import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper._ import org.apache.spark.sql.execution.streaming.state.{DropLastNFieldsStatePartitionKeyExtractor, KeyStateEncoderSpec, NoopStatePartitionKeyExtractor, NoPrefixKeyStateEncoderSpec, StatePartitionKeyExtractor, StateSchemaBroadcast, StateStore, StateStoreCheckpointInfo, StateStoreColFamilySchema, StateStoreConf, StateStoreErrors, StateStoreId, StateStoreMetrics, StateStoreProvider, StateStoreProviderId, SupportsFineGrainedReplay, TimestampAsPostfixKeyStateEncoderSpec, TimestampAsPrefixKeyStateEncoderSpec, TimestampKeyStateEncoder} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType, StructField, StructType} import org.apache.spark.util.NextIterator @@ -252,9 +253,9 @@ class SymmetricHashJoinStateManagerV4( Seq(StructField("dummy", NullType, nullable = true)) ) - // TODO: [SPARK-55628] Below two fields need to be handled properly during integration with - // the operator. - private val stateStoreCkptId: Option[String] = None + // V4 uses a single store with VCFs (not separate keyToNumValues/keyWithIndexToValue stores). + // Use the keyToNumValues checkpoint ID for loading the correct committed version. + private val stateStoreCkptId: Option[String] = keyToNumValuesStateStoreCkptId private val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None private var stateStoreProvider: StateStoreProvider = _ @@ -496,7 +497,7 @@ class SymmetricHashJoinStateManagerV4( private val attachTimestampProjection: UnsafeProjection = TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema) - // Create the specific column family in the store for this join side's KeyWithIndexToValueStore + // Create the specific column family in the store for this join side's KeyWithTsToValuesStore. stateStore.createColFamilyIfAbsent( colFamilyName, keySchema, @@ -648,13 +649,15 @@ class SymmetricHashJoinStateManagerV4( private val attachTimestampProjection: UnsafeProjection = TimestampKeyStateEncoder.getAttachTimestampProjection(keySchema) - // Create the specific column family in the store for this join side's KeyWithIndexToValueStore + // Create the specific column family in the store for this join side's TsWithKeyStore. + // Mark as internal so that numKeys counts only primary data, not the secondary index. stateStore.createColFamilyIfAbsent( colFamilyName, keySchema, valueStructType, TimestampAsPrefixKeyStateEncoderSpec(keySchemaWithTimestamp), - useMultipleValuesPerKey = true + useMultipleValuesPerKey = true, + isInternal = true ) private def createKeyRow(key: UnsafeRow, timestamp: Long): UnsafeRow = { @@ -1311,8 +1314,8 @@ abstract class SymmetricHashJoinStateManagerBase( val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None) extends StateStoreHandler( KeyToNumValuesType, keyToNumValuesStateStoreCkptId, handlerSnapshotOptions) { -SnapshotOptions - private val useVirtualColumnFamilies = stateFormatVersion == 3 + + private val useVirtualColumnFamilies = stateFormatVersion >= 3 private val longValueSchema = new StructType().add("value", "long") private val longToUnsafeRow = UnsafeProjection.create(longValueSchema) private val valueRow = longToUnsafeRow(new SpecificInternalRow(longValueSchema)) @@ -1411,7 +1414,7 @@ SnapshotOptions extends StateStoreHandler( KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId, handlerSnapshotOptions) { - private val useVirtualColumnFamilies = stateFormatVersion == 3 + private val useVirtualColumnFamilies = stateFormatVersion >= 3 private val keyWithIndexExprs = keyAttributes :+ Literal(1L) private val keyWithIndexSchema = keySchema.add("index", LongType) private val indexOrdinalInKeyWithIndexRow = keyAttributes.size @@ -1744,6 +1747,8 @@ object SymmetricHashJoinStateManager { snapshotOptions: Option[SnapshotOptions] = None, joinStoreGenerator: JoinStateManagerStoreGenerator): SymmetricHashJoinStateManager = { if (stateFormatVersion == 4) { + require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED), + "State format version 4 is under development.") new SymmetricHashJoinStateManagerV4( joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf, partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId, @@ -1780,28 +1785,44 @@ object SymmetricHashJoinStateManager { inputValueAttributes: Seq[Attribute], joinKeys: Seq[Expression], stateFormatVersion: Int): Map[String, (StructType, StructType)] = { - var result: Map[String, (StructType, StructType)] = Map.empty - - // get the key and value schema for the KeyToNumValues state store val keySchema = StructType( joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i", k.dataType, k.nullable) }) - val longValueSchema = new StructType().add("value", "long") - result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, longValueSchema)) - - // get the key and value schema for the KeyWithIndexToValue state store - val keyWithIndexSchema = keySchema.add("index", LongType) - val valueSchema = if (stateFormatVersion == 1) { - inputValueAttributes - } else if (stateFormatVersion == 2 || stateFormatVersion == 3) { - inputValueAttributes :+ AttributeReference("matched", BooleanType)() + + if (stateFormatVersion == 4) { + // V4 uses two column families: KeyWithTsToValues and TsWithKey + val keySchemaWithTimestamp = + TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema) + val valueWithMatchedSchema = + (inputValueAttributes :+ AttributeReference("matched", BooleanType)()).toStructType + val dummyValueSchema = StructType(Array(StructField("__dummy__", NullType))) + + Map( + getStateStoreName(joinSide, KeyWithTsToValuesType) -> + (keySchemaWithTimestamp, valueWithMatchedSchema), + getStateStoreName(joinSide, TsWithKeyType) -> + (keySchemaWithTimestamp, dummyValueSchema)) } else { - throw new IllegalArgumentException("Incorrect state format version! " + - s"version=$stateFormatVersion") - } - result += (getStateStoreName(joinSide, KeyWithIndexToValueType) -> - (keyWithIndexSchema, valueSchema.toStructType)) + var result: Map[String, (StructType, StructType)] = Map.empty + + // get the key and value schema for the KeyToNumValues state store + val longValueSchema = new StructType().add("value", "long") + result += (getStateStoreName(joinSide, KeyToNumValuesType) -> (keySchema, longValueSchema)) + + // get the key and value schema for the KeyWithIndexToValue state store + val keyWithIndexSchema = keySchema.add("index", LongType) + val valueSchema = if (stateFormatVersion == 1) { + inputValueAttributes + } else if (stateFormatVersion == 2 || stateFormatVersion == 3) { + inputValueAttributes :+ AttributeReference("matched", BooleanType)() + } else { + throw new IllegalArgumentException("Incorrect state format version! " + + s"version=$stateFormatVersion") + } + result += (getStateStoreName(joinSide, KeyWithIndexToValueType) -> + (keyWithIndexSchema, valueSchema.toStructType)) - result + result + } } /** Retrieves the schemas used for join operator state stores that use column families */ @@ -1816,9 +1837,18 @@ object SymmetricHashJoinStateManager { schemas.map { case (colFamilyName, (keySchema, valueSchema)) => + val keyStateEncoderSpec = if (stateFormatVersion == 4) { + if (colFamilyName == getStateStoreName(joinSide, KeyWithTsToValuesType)) { + TimestampAsPostfixKeyStateEncoderSpec(keySchema) + } else { + TimestampAsPrefixKeyStateEncoderSpec(keySchema) + } + } else { + NoPrefixKeyStateEncoderSpec(keySchema) + } colFamilyName -> StateStoreColFamilySchema( colFamilyName, 0, keySchema, 0, valueSchema, - Some(NoPrefixKeyStateEncoderSpec(keySchema)) + Some(keyStateEncoderSpec) ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 12881ce368067..8bb1f609b2b44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -611,6 +611,10 @@ object KeyStateEncoderSpec { case "PrefixKeyScanStateEncoderSpec" => val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[BigInt].toInt PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) + case "TimestampAsPostfixKeyStateEncoderSpec" => + TimestampAsPostfixKeyStateEncoderSpec(keySchema) + case "TimestampAsPrefixKeyStateEncoderSpec" => + TimestampAsPrefixKeyStateEncoderSpec(keySchema) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 6cdca9fb5309f..62f45f6445146 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -941,7 +941,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") val useVirtualColumnFamilies = - spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) == 3 + spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) >= 3 // Number of shuffle partitions being used is 3 val numStateStoreInstances = if (useVirtualColumnFamilies) { // Only one state store is created per partition if we're using virtual column families diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala new file mode 100644 index 0000000000000..48ce34f611c76 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.hadoop.fs.Path +import org.scalatest.{Args, Status, Tag} + +import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinExec +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.tags.SlowSQLTest + +/** + * Trait that overrides test execution to run with state format version 4. + * V4 uses timestamp-based indexing with a secondary index and requires + * RocksDB with virtual column families. The innermost withSQLConf wins, + * so wrapping the test body overrides the V3 setting from the parent trait. + */ +trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins { + + override def testWithVirtualColumnFamilyJoins( + testName: String, testTags: Tag*)(testBody: => Any): Unit = { + super.testWithVirtualColumnFamilyJoins(testName, testTags: _*) { + withSQLConf( + SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "4" + ) { + testBody + } + } + } + + // V4 always uses virtual column families, so skip non-VCF tests. + override def testWithoutVirtualColumnFamilyJoins( + testName: String, testTags: Tag*)(testBody: => Any): Unit = {} + + // Use lazy val because the parent constructor registers tests before + // subclass vals are initialized. + private lazy val testsToSkip = Seq( + // V4's timestamp-based indexing does not support window structs + // in join keys. + "stream stream inner join on windows - with watermark", + // V4 uses 1 store with VCFs instead of V3's 4*partitions layout, + // so metric assertions about number of state store instances differ. + "SPARK-35896: metrics in StateOperatorProgress are output correctly", + // V4 uses different column families and encoder specs than V3; + // overridden in StreamingInnerJoinV4Suite with V4-specific assertions. + "SPARK-51779 Verify StateSchemaV3 writes correct key and value " + + "schemas for join operator" + ) + + override def runTest(testName: String, args: Args): Status = { + if (testsToSkip.exists(testName.contains)) { + org.scalatest.SucceededStatus + } else { + super.runTest(testName, args) + } + } +} + +@SlowSQLTest +class StreamingInnerJoinV4Suite + extends StreamingInnerJoinSuite + with TestWithV4StateFormat { + + import testImplicits._ + + test("SPARK-55628: V4 state format is active in execution plan") { + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "ts", + ($"value" * 2) as "leftValue") + .withWatermark("ts", "10 seconds") + val df2 = input2.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "ts", + ($"value" * 3) as "rightValue") + .withWatermark("ts", "10 seconds") + + val joined = df1.join(df2, Seq("key"), "inner") + + testStream(joined)( + AddData(input1, 1), + CheckAnswer(), + Execute { q => + val joinNodes = q.lastExecution.executedPlan.collect { + case j: StreamingSymmetricHashJoinExec => j + } + assert(joinNodes.length == 1) + assert(joinNodes.head.stateFormatVersion == 4) + }, + StopStream + ) + } + + // V4 uses different column families (keyWithTsToValues, tsWithKey) + // with timestamp-based key encoder specs instead of V3's + // keyToNumValues/keyWithIndexToValue. + testWithVirtualColumnFamilyJoins( + "SPARK-55628: verify V4 state schema writes correct key and " + + "value schemas for join operator") { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF() + .select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF() + .select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, "key") + + val metadataPathPostfix = "state/0/_stateSchema/default" + val stateSchemaPath = + new Path(checkpointDir.toString, s"$metadataPathPostfix") + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = + CheckpointFileManager.create(stateSchemaPath, hadoopConf) + + val keySchemaWithTimestamp = new StructType() + .add("field0", IntegerType, nullable = false) + .add("__event_time", LongType, nullable = false) + + val leftValueSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("leftValue", IntegerType, nullable = false) + .add("matched", BooleanType) + val rightValueSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("rightValue", IntegerType, nullable = false) + .add("matched", BooleanType) + + val dummyValueSchema = + StructType(Array(StructField("__dummy__", NullType))) + + val schemaLeftPrimary = StateStoreColFamilySchema( + "left-keyWithTsToValues", 0, + keySchemaWithTimestamp, 0, leftValueSchema, + Some(TimestampAsPostfixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + val schemaLeftSecondary = StateStoreColFamilySchema( + "left-tsWithKey", 0, + keySchemaWithTimestamp, 0, dummyValueSchema, + Some(TimestampAsPrefixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + val schemaRightPrimary = StateStoreColFamilySchema( + "right-keyWithTsToValues", 0, + keySchemaWithTimestamp, 0, rightValueSchema, + Some(TimestampAsPostfixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + val schemaRightSecondary = StateStoreColFamilySchema( + "right-tsWithKey", 0, + keySchemaWithTimestamp, 0, dummyValueSchema, + Some(TimestampAsPrefixKeyStateEncoderSpec( + keySchemaWithTimestamp)), + None + ) + + testStream(joined)( + StartStream( + checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1), + CheckAnswer(), + AddData(input2, 1, 10), + CheckNewAnswer((1, 2, 3)), + Execute { q => + val schemaFilePath = + fm.list(stateSchemaPath).toSeq.head.getPath + val providerId = StateStoreProviderId( + StateStoreId(checkpointDir.getCanonicalPath, 0, 0), + q.lastProgress.runId + ) + val checker = new StateSchemaCompatibilityChecker( + providerId, + hadoopConf, + List(schemaFilePath) + ) + val colFamilySeq = checker.readSchemaFile() + assert(colFamilySeq.length == 4) + assert(colFamilySeq.map(_.toString).toSet == Set( + schemaLeftPrimary, schemaLeftSecondary, + schemaRightPrimary, schemaRightSecondary + ).map(_.toString)) + }, + StopStream + ) + } + } +} + +@SlowSQLTest +class StreamingOuterJoinV4Suite + extends StreamingOuterJoinSuite + with TestWithV4StateFormat + +@SlowSQLTest +class StreamingFullOuterJoinV4Suite + extends StreamingFullOuterJoinSuite + with TestWithV4StateFormat + +@SlowSQLTest +class StreamingLeftSemiJoinV4Suite + extends StreamingLeftSemiJoinSuite + with TestWithV4StateFormat