From 00ef5c135238e42641457e8468ca54a00c0d7e34 Mon Sep 17 00:00:00 2001 From: Livia Zhu Date: Thu, 19 Feb 2026 10:40:09 -0800 Subject: [PATCH 1/4] metadata log changes --- .../apache/spark/sql/internal/SQLConf.scala | 13 ++++ .../v2/state/StateDataSource.scala | 11 ++- .../streaming/checkpointing/CommitLog.scala | 7 +- .../checkpointing/HDFSMetadataLog.scala | 19 ++++- .../checkpointing/OffsetSeqLog.scala | 7 +- .../StreamingQueryCheckpointMetadata.scala | 13 +++- .../state/OperatorStateMetadata.scala | 6 +- .../v2/state/StateDataSourceReadSuite.scala | 74 +++++++++++++++++++ .../streaming/HDFSMetadataLogSuite.scala | 50 +++++++++++++ 9 files changed, 185 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f17199547665f..bba9281eb8cbc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2584,6 +2584,16 @@ object SQLConf { .createWithDefault( "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider") + val STREAMING_CHECKPOINT_CREATE_DIR_ON_READ = + buildConf("spark.sql.streaming.checkpoint.createDirOnRead") + .internal() + .doc( + "When true, the streaming checkpoint metadata log (offsetLog, commitLog) will create " + + "the parent directory if it doesn't exist during initialization.") + .version("4.2.0") + .booleanConf + .createWithDefault(false) + val NUM_STATE_STORE_MAINTENANCE_THREADS = buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads") .internal() @@ -7088,6 +7098,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS) + def streamingCheckpointCreateDirOnRead: Boolean = + getConf(STREAMING_CHECKPOINT_CREATE_DIR_ON_READ) + def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED) def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala index 9ccbb9a649f25..d99b292128129 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala @@ -150,7 +150,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging batchId: Long): Option[Int] = { if (storeMetadata.nonEmpty && storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) { - new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog + new StreamingQueryCheckpointMetadata(session, checkpointLocation, readOnly = true).offsetLog .get(batchId) .flatMap(_.metadataOpt) .flatMap(_.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key)) @@ -178,7 +178,8 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging private def buildSqlConfForBatch( checkpointLocation: String, batchId: Long): SQLConf = { - val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog + val offsetLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).offsetLog offsetLog.get(batchId) match { case Some(value) => val metadata = value.metadataOpt.getOrElse( @@ -764,7 +765,8 @@ object StateSourceOptions extends DataSourceOptions with Logging{ } private def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Long = { - val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog + val commitLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).commitLog commitLog.getLatest() match { case Some((lastId, _)) => lastId case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation) @@ -776,7 +778,8 @@ object StateSourceOptions extends DataSourceOptions with Logging{ batchId: Long, operatorId: Long, checkpointLocation: String): Option[Array[Array[String]]] = { - val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog + val commitLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).commitLog val commitMetadata = commitLog.get(batchId) match { case Some(commitMetadata) => commitMetadata case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala index 6892b6b535cf9..b73020b6060c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala @@ -46,8 +46,11 @@ import org.apache.spark.sql.internal.SQLConf * line 1: version * line 2: metadata (optional json string) */ -class CommitLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[CommitMetadata](sparkSession, path) { +class CommitLog( + sparkSession: SparkSession, + path: String, + readOnly: Boolean = false) + extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) { import CommitLog._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala index 6d35b1a8f8c00..68ce681535a0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala @@ -48,9 +48,16 @@ import org.apache.spark.util.Utils * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing * files in a directory always shows the latest files. */ -class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String) +class HDFSMetadataLog[T <: AnyRef : ClassTag]( + sparkSession: SparkSession, + path: String, + readOnly: Boolean = false) extends MetadataLog[T] with Logging { + // When readOnly is true, only skip creating dir if streamingCheckpointCreateDirOnRead is false + private val effectiveReadOnly = + readOnly && !sparkSession.sessionState.conf.streamingCheckpointCreateDirOnRead + private implicit val formats: Formats = Serialization.formats(NoTypeHints) /** Needed to serialize type T into JSON when using Jackson */ @@ -66,7 +73,9 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: protected val fileManager = CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf()) - if (!fileManager.exists(metadataPath)) { + // If this is not a readOnly log or the createDirOnRead conf is true, and the metadata path does + // not exist, create the directory + if (!effectiveReadOnly && !fileManager.exists(metadataPath)) { fileManager.mkdirs(metadataPath) } @@ -327,6 +336,9 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: /** List the available batches on file system. */ protected def listBatches: Array[Long] = { + if (!fileManager.exists(metadataPath)) { + return Array.empty + } val batchIds = fileManager.list(metadataPath, batchFilesFilter) // Batches must be files .filter(f => f.isFile) @@ -351,6 +363,9 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * @return array of batches ids */ def listBatchesOnDisk: Array[Long] = { + if (!fileManager.exists(metadataPath)) { + return Array.empty + } fileManager.list(metadataPath, batchFilesFilter) .map(f => pathToBatchId(f.getPath)).sorted } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala index 8af138e330c46..ab67915c0151d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala @@ -51,8 +51,11 @@ import org.apache.spark.sql.execution.streaming.runtime.SerializedOffset * 1:{3} // sourceId:offset * ... */ -class OffsetSeqLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path) { +class OffsetSeqLog( + sparkSession: SparkSession, + path: String, + readOnly: Boolean = false) + extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path, readOnly) { override protected def deserialize(in: InputStream): OffsetSeqBase = { // called inside a try-finally where the underlying stream is closed in the caller diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala index 4e02f323b89af..4a6a2e735d377 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala @@ -30,7 +30,10 @@ import org.apache.spark.sql.internal.SQLConf * @param sparkSession Spark session * @param resolvedCheckpointRoot The resolved checkpoint root path */ -class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheckpointRoot: String) { +class StreamingQueryCheckpointMetadata( + sparkSession: SparkSession, + resolvedCheckpointRoot: String, + readOnly: Boolean = false) { /** * A write-ahead-log that records the offsets that are present in each batch. In order to ensure @@ -39,7 +42,9 @@ class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheck * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ lazy val offsetLog = - new OffsetSeqLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS)) + new OffsetSeqLog(sparkSession, + checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS), + readOnly) /** * A log that records the batch ids that have completed. This is used to check if a batch was @@ -47,7 +52,9 @@ class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheck * This is used (for instance) during restart, to help identify which batch to run next. */ lazy val commitLog = - new CommitLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS)) + new CommitLog(sparkSession, + checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS), + readOnly) /** Metadata associated with the whole query */ final lazy val streamMetadata: StreamMetadata = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala index 6b2295da03b99..5e218e16ba0b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala @@ -183,13 +183,15 @@ object OperatorStateMetadataUtils extends Logging { } def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): Long = { - val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog + val offsetLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).offsetLog offsetLog.getLatest().map(_._1).getOrElse(throw StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation)) } def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Option[Long] = { - val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog + val commitLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).commitLog commitLog.getLatest().map(_._1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 526d39478b915..453925ef0f52d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -1501,3 +1501,77 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass } } } + +/** + * Test suite that verifies the state data source reader does not create empty state + * directories when reading state for all stateful operators. + */ +class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase { + + /** + * Asserts that the cause chain of the given exception contains + * an instance of the expected type. + */ + private def assertCauseChainContains( + e: Throwable, + expectedType: Class[_ <: Throwable]): Unit = { + var current: Throwable = e + while (current != null) { + if (expectedType.isInstance(current)) return + current = current.getCause + } + fail( + s"Expected ${expectedType.getSimpleName} in cause chain, " + + s"but got: ${e.getClass.getSimpleName}: ${e.getMessage}") + } + + test("deleted offsets directory is not recreated on read") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val offsetsDir = new File(tempDir, "offsets") + assert(offsetsDir.exists(), "Offsets directory should exist after running the query") + Utils.deleteRecursively(offsetsDir) + assert(!offsetsDir.exists(), "Offsets directory should be deleted") + + val e1 = intercept[Exception] { + spark.read + .format("statestore") + .option(StateSourceOptions.PATH, checkpointPath) + .load() + .collect() + } + assertCauseChainContains(e1, + classOf[StateDataSourceOffsetLogUnavailable]) + + assert(!offsetsDir.exists(), + "State data source reader should not recreate the deleted offsets directory") + } + } + + test("deleted commits directory is not recreated on read") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val commitsDir = new File(tempDir, "commits") + assert(commitsDir.exists(), "Commits directory should exist after running the query") + Utils.deleteRecursively(commitsDir) + assert(!commitsDir.exists(), "Commits directory should be deleted") + + val e2 = intercept[Exception] { + spark.read + .format("statestore") + .option(StateSourceOptions.PATH, checkpointPath) + .load() + .collect() + } + assertCauseChainContains(e2, + classOf[StataDataSourceCommittedBatchUnavailable]) + + assert(!commitsDir.exists(), + "State data source reader should not recreate the deleted commits directory") + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index d6702c1e4ea50..9bf32bf88fc1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -218,4 +218,54 @@ class HDFSMetadataLogSuite extends SharedSparkSession { intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L))) intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L))) } + + test("HDFSMetadataLog: readOnly=false always creates directory") { + withTempDir { temp => + val dir = new File(temp, "nonexistent") + assert(!dir.exists()) + new HDFSMetadataLog[String](spark, dir.getAbsolutePath) + assert(dir.exists(), + "HDFSMetadataLog should create directory when readOnly=false (default)") + } + } + + test("HDFSMetadataLog: readOnly=false creates directory even with createDirOnRead=false") { + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_CREATE_DIR_ON_READ.key -> "false") { + withTempDir { temp => + val dir = new File(temp, "nonexistent") + assert(!dir.exists()) + new HDFSMetadataLog[String](spark, dir.getAbsolutePath) + assert(dir.exists(), + "HDFSMetadataLog should create directory when readOnly=false regardless of conf") + } + } + } + + test("HDFSMetadataLog: readOnly=true with createDirOnRead=true creates directory") { + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_CREATE_DIR_ON_READ.key -> "true") { + withTempDir { temp => + val dir = new File(temp, "nonexistent") + assert(!dir.exists()) + new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true) + assert(dir.exists(), + "HDFSMetadataLog should create directory when readOnly=true and createDirOnRead=true") + } + } + } + + test("HDFSMetadataLog: readOnly=true with createDirOnRead=false does not create directory") { + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_CREATE_DIR_ON_READ.key -> "false") { + withTempDir { temp => + val dir = new File(temp, "nonexistent") + assert(!dir.exists()) + new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true) + assert(!dir.exists(), + "HDFSMetadataLog should not create directory when readOnly=true " + + "and createDirOnRead=false") + } + } + } } From aec517464aeb01e4e64ea5cd057119e816ef95c2 Mon Sep 17 00:00:00 2001 From: Livia Zhu Date: Thu, 19 Feb 2026 11:01:55 -0800 Subject: [PATCH 2/4] rm conf --- .../apache/spark/sql/internal/SQLConf.scala | 13 ------ .../checkpointing/HDFSMetadataLog.scala | 9 +--- .../streaming/HDFSMetadataLogSuite.scala | 44 +++---------------- 3 files changed, 9 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bba9281eb8cbc..f17199547665f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2584,16 +2584,6 @@ object SQLConf { .createWithDefault( "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider") - val STREAMING_CHECKPOINT_CREATE_DIR_ON_READ = - buildConf("spark.sql.streaming.checkpoint.createDirOnRead") - .internal() - .doc( - "When true, the streaming checkpoint metadata log (offsetLog, commitLog) will create " + - "the parent directory if it doesn't exist during initialization.") - .version("4.2.0") - .booleanConf - .createWithDefault(false) - val NUM_STATE_STORE_MAINTENANCE_THREADS = buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads") .internal() @@ -7098,9 +7088,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS) - def streamingCheckpointCreateDirOnRead: Boolean = - getConf(STREAMING_CHECKPOINT_CREATE_DIR_ON_READ) - def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED) def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala index 68ce681535a0d..edfe91258c34f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala @@ -54,10 +54,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag]( readOnly: Boolean = false) extends MetadataLog[T] with Logging { - // When readOnly is true, only skip creating dir if streamingCheckpointCreateDirOnRead is false - private val effectiveReadOnly = - readOnly && !sparkSession.sessionState.conf.streamingCheckpointCreateDirOnRead - private implicit val formats: Formats = Serialization.formats(NoTypeHints) /** Needed to serialize type T into JSON when using Jackson */ @@ -73,9 +69,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag]( protected val fileManager = CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf()) - // If this is not a readOnly log or the createDirOnRead conf is true, and the metadata path does - // not exist, create the directory - if (!effectiveReadOnly && !fileManager.exists(metadataPath)) { + // When readOnly is false and the metadata path does not exist, create the directory + if (!readOnly && !fileManager.exists(metadataPath)) { fileManager.mkdirs(metadataPath) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 9bf32bf88fc1c..827906258378a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -229,43 +229,13 @@ class HDFSMetadataLogSuite extends SharedSparkSession { } } - test("HDFSMetadataLog: readOnly=false creates directory even with createDirOnRead=false") { - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_CREATE_DIR_ON_READ.key -> "false") { - withTempDir { temp => - val dir = new File(temp, "nonexistent") - assert(!dir.exists()) - new HDFSMetadataLog[String](spark, dir.getAbsolutePath) - assert(dir.exists(), - "HDFSMetadataLog should create directory when readOnly=false regardless of conf") - } - } - } - - test("HDFSMetadataLog: readOnly=true with createDirOnRead=true creates directory") { - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_CREATE_DIR_ON_READ.key -> "true") { - withTempDir { temp => - val dir = new File(temp, "nonexistent") - assert(!dir.exists()) - new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true) - assert(dir.exists(), - "HDFSMetadataLog should create directory when readOnly=true and createDirOnRead=true") - } - } - } - - test("HDFSMetadataLog: readOnly=true with createDirOnRead=false does not create directory") { - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_CREATE_DIR_ON_READ.key -> "false") { - withTempDir { temp => - val dir = new File(temp, "nonexistent") - assert(!dir.exists()) - new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true) - assert(!dir.exists(), - "HDFSMetadataLog should not create directory when readOnly=true " + - "and createDirOnRead=false") - } + test("HDFSMetadataLog: readOnly=true does not create directory") { + withTempDir { temp => + val dir = new File(temp, "nonexistent") + assert(!dir.exists()) + new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true) + assert(!dir.exists(), + "HDFSMetadataLog should not create directory when readOnly=true") } } } From 3aa65a90572ed2ba6b600ad027f4d53b98098604 Mon Sep 17 00:00:00 2001 From: Livia Zhu Date: Thu, 12 Mar 2026 15:18:51 -0400 Subject: [PATCH 3/4] add comments --- .../sql/execution/streaming/checkpointing/HDFSMetadataLog.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala index edfe91258c34f..fac502f75f3a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala @@ -331,6 +331,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag]( /** List the available batches on file system. */ protected def listBatches: Array[Long] = { + // If parent doesn't exist, return empty array rather than throwing an exception if (!fileManager.exists(metadataPath)) { return Array.empty } @@ -358,6 +359,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag]( * @return array of batches ids */ def listBatchesOnDisk: Array[Long] = { + // If parent doesn't exist, return empty array rather than throwing an exception if (!fileManager.exists(metadataPath)) { return Array.empty } From f3c39085747a42807103a0de728a792d602e88c2 Mon Sep 17 00:00:00 2001 From: Livia Zhu Date: Thu, 12 Mar 2026 19:12:49 -0400 Subject: [PATCH 4/4] add state metadata tests --- .../v2/state/StateDataSourceReadSuite.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 921d591b750a3..05916c4816a9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -1576,6 +1576,57 @@ class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase { } } + test("deleted commits directory is not recreated on read (state-metadata source)") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val commitsDir = new File(tempDir, "commits") + assert(commitsDir.exists(), "Commits directory should exist after running the query") + Utils.deleteRecursively(commitsDir) + assert(!commitsDir.exists(), "Commits directory should be deleted") + + spark.read.format("state-metadata").load(checkpointPath).collect() + + assert(!commitsDir.exists(), + "State-metadata source reader should not recreate the deleted commits directory") + } + } + + test("deleted offsets directory is not recreated on read (state-metadata source)") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val offsetsDir = new File(tempDir, "offsets") + assert(offsetsDir.exists(), "Offsets directory should exist after running the query") + Utils.deleteRecursively(offsetsDir) + assert(!offsetsDir.exists(), "Offsets directory should be deleted") + + spark.read.format("state-metadata").load(checkpointPath).collect() + + assert(!offsetsDir.exists(), + "State-metadata source reader should not recreate the deleted offsets directory") + } + } + + test("deleted state directory is not recreated on read (state-metadata source)") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val stateDir = new File(tempDir, "state") + assert(stateDir.exists(), "State directory should exist after running the query") + Utils.deleteRecursively(stateDir) + assert(!stateDir.exists(), "State directory should be deleted") + + spark.read.format("state-metadata").load(checkpointPath).collect() + + assert(!stateDir.exists(), + "State-metadata source reader should not recreate the deleted state directory") + } + } + /** * Runs a stateful query to create the checkpoint structure, deletes the state directory, * then attempts to read via the state data source and verifies that the state directory