diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala index 07b756006525d..672e8e3786181 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala @@ -150,7 +150,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging batchId: Long): Option[Int] = { if (storeMetadata.nonEmpty && storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) { - new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog + new StreamingQueryCheckpointMetadata(session, checkpointLocation, readOnly = true).offsetLog .get(batchId) .flatMap(_.metadataOpt) .flatMap(_.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key)) @@ -178,7 +178,8 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging private def buildSqlConfForBatch( checkpointLocation: String, batchId: Long): SQLConf = { - val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog + val offsetLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).offsetLog offsetLog.get(batchId) match { case Some(value) => val metadata = value.metadataOpt.getOrElse( @@ -764,7 +765,8 @@ object StateSourceOptions extends DataSourceOptions with Logging{ } private def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Long = { - val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog + val commitLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).commitLog commitLog.getLatest() match { case Some((lastId, _)) => lastId case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation) @@ -776,7 +778,8 @@ object StateSourceOptions extends DataSourceOptions with Logging{ batchId: Long, operatorId: Long, checkpointLocation: String): Option[Array[Array[String]]] = { - val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog + val commitLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).commitLog val commitMetadata = commitLog.get(batchId) match { case Some(commitMetadata) => commitMetadata case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala index 6892b6b535cf9..b73020b6060c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala @@ -46,8 +46,11 @@ import org.apache.spark.sql.internal.SQLConf * line 1: version * line 2: metadata (optional json string) */ -class CommitLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[CommitMetadata](sparkSession, path) { +class CommitLog( + sparkSession: SparkSession, + path: String, + readOnly: Boolean = false) + extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) { import CommitLog._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala index 6d35b1a8f8c00..fac502f75f3a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/HDFSMetadataLog.scala @@ -48,7 +48,10 @@ import org.apache.spark.util.Utils * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing * files in a directory always shows the latest files. */ -class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String) +class HDFSMetadataLog[T <: AnyRef : ClassTag]( + sparkSession: SparkSession, + path: String, + readOnly: Boolean = false) extends MetadataLog[T] with Logging { private implicit val formats: Formats = Serialization.formats(NoTypeHints) @@ -66,7 +69,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: protected val fileManager = CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf()) - if (!fileManager.exists(metadataPath)) { + // When readOnly is false and the metadata path does not exist, create the directory + if (!readOnly && !fileManager.exists(metadataPath)) { fileManager.mkdirs(metadataPath) } @@ -327,6 +331,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: /** List the available batches on file system. */ protected def listBatches: Array[Long] = { + // If parent doesn't exist, return empty array rather than throwing an exception + if (!fileManager.exists(metadataPath)) { + return Array.empty + } val batchIds = fileManager.list(metadataPath, batchFilesFilter) // Batches must be files .filter(f => f.isFile) @@ -351,6 +359,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * @return array of batches ids */ def listBatchesOnDisk: Array[Long] = { + // If parent doesn't exist, return empty array rather than throwing an exception + if (!fileManager.exists(metadataPath)) { + return Array.empty + } fileManager.list(metadataPath, batchFilesFilter) .map(f => pathToBatchId(f.getPath)).sorted } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala index 8af138e330c46..ab67915c0151d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala @@ -51,8 +51,11 @@ import org.apache.spark.sql.execution.streaming.runtime.SerializedOffset * 1:{3} // sourceId:offset * ... */ -class OffsetSeqLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path) { +class OffsetSeqLog( + sparkSession: SparkSession, + path: String, + readOnly: Boolean = false) + extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path, readOnly) { override protected def deserialize(in: InputStream): OffsetSeqBase = { // called inside a try-finally where the underlying stream is closed in the caller diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala index 4e02f323b89af..4a6a2e735d377 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingQueryCheckpointMetadata.scala @@ -30,7 +30,10 @@ import org.apache.spark.sql.internal.SQLConf * @param sparkSession Spark session * @param resolvedCheckpointRoot The resolved checkpoint root path */ -class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheckpointRoot: String) { +class StreamingQueryCheckpointMetadata( + sparkSession: SparkSession, + resolvedCheckpointRoot: String, + readOnly: Boolean = false) { /** * A write-ahead-log that records the offsets that are present in each batch. In order to ensure @@ -39,7 +42,9 @@ class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheck * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ lazy val offsetLog = - new OffsetSeqLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS)) + new OffsetSeqLog(sparkSession, + checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS), + readOnly) /** * A log that records the batch ids that have completed. This is used to check if a batch was @@ -47,7 +52,9 @@ class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheck * This is used (for instance) during restart, to help identify which batch to run next. */ lazy val commitLog = - new CommitLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS)) + new CommitLog(sparkSession, + checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS), + readOnly) /** Metadata associated with the whole query */ final lazy val streamMetadata: StreamMetadata = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala index ac0f42c340074..e046be487999f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala @@ -183,13 +183,15 @@ object OperatorStateMetadataUtils extends Logging { } def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): Long = { - val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog + val offsetLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).offsetLog offsetLog.getLatest().map(_._1).getOrElse(throw StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation)) } def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Option[Long] = { - val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog + val commitLog = new StreamingQueryCheckpointMetadata( + session, checkpointLocation, readOnly = true).commitLog commitLog.getLatest().map(_._1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index ce29c87bc76ea..05916c4816a9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -1526,6 +1526,107 @@ class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase { s"but got: ${e.getClass.getSimpleName}: ${e.getMessage}") } + test("deleted offsets directory is not recreated on read") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val offsetsDir = new File(tempDir, "offsets") + assert(offsetsDir.exists(), "Offsets directory should exist after running the query") + Utils.deleteRecursively(offsetsDir) + assert(!offsetsDir.exists(), "Offsets directory should be deleted") + + val e1 = intercept[Exception] { + spark.read + .format("statestore") + .option(StateSourceOptions.PATH, checkpointPath) + .load() + .collect() + } + assertCauseChainContains(e1, + classOf[StateDataSourceOffsetLogUnavailable]) + + assert(!offsetsDir.exists(), + "State data source reader should not recreate the deleted offsets directory") + } + } + + test("deleted commits directory is not recreated on read") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val commitsDir = new File(tempDir, "commits") + assert(commitsDir.exists(), "Commits directory should exist after running the query") + Utils.deleteRecursively(commitsDir) + assert(!commitsDir.exists(), "Commits directory should be deleted") + + val e2 = intercept[Exception] { + spark.read + .format("statestore") + .option(StateSourceOptions.PATH, checkpointPath) + .load() + .collect() + } + assertCauseChainContains(e2, + classOf[StataDataSourceCommittedBatchUnavailable]) + + assert(!commitsDir.exists(), + "State data source reader should not recreate the deleted commits directory") + } + } + + test("deleted commits directory is not recreated on read (state-metadata source)") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val commitsDir = new File(tempDir, "commits") + assert(commitsDir.exists(), "Commits directory should exist after running the query") + Utils.deleteRecursively(commitsDir) + assert(!commitsDir.exists(), "Commits directory should be deleted") + + spark.read.format("state-metadata").load(checkpointPath).collect() + + assert(!commitsDir.exists(), + "State-metadata source reader should not recreate the deleted commits directory") + } + } + + test("deleted offsets directory is not recreated on read (state-metadata source)") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val offsetsDir = new File(tempDir, "offsets") + assert(offsetsDir.exists(), "Offsets directory should exist after running the query") + Utils.deleteRecursively(offsetsDir) + assert(!offsetsDir.exists(), "Offsets directory should be deleted") + + spark.read.format("state-metadata").load(checkpointPath).collect() + + assert(!offsetsDir.exists(), + "State-metadata source reader should not recreate the deleted offsets directory") + } + } + + test("deleted state directory is not recreated on read (state-metadata source)") { + withTempDir { tempDir => + val checkpointPath = tempDir.getAbsolutePath + runLargeDataStreamingAggregationQuery(checkpointPath) + + val stateDir = new File(tempDir, "state") + assert(stateDir.exists(), "State directory should exist after running the query") + Utils.deleteRecursively(stateDir) + assert(!stateDir.exists(), "State directory should be deleted") + + spark.read.format("state-metadata").load(checkpointPath).collect() + + assert(!stateDir.exists(), + "State-metadata source reader should not recreate the deleted state directory") + } + } + /** * Runs a stateful query to create the checkpoint structure, deletes the state directory, * then attempts to read via the state data source and verifies that the state directory @@ -1656,5 +1757,4 @@ class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase { } ) } - } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index d6702c1e4ea50..827906258378a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -218,4 +218,24 @@ class HDFSMetadataLogSuite extends SharedSparkSession { intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L))) intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L))) } + + test("HDFSMetadataLog: readOnly=false always creates directory") { + withTempDir { temp => + val dir = new File(temp, "nonexistent") + assert(!dir.exists()) + new HDFSMetadataLog[String](spark, dir.getAbsolutePath) + assert(dir.exists(), + "HDFSMetadataLog should create directory when readOnly=false (default)") + } + } + + test("HDFSMetadataLog: readOnly=true does not create directory") { + withTempDir { temp => + val dir = new File(temp, "nonexistent") + assert(!dir.exists()) + new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true) + assert(!dir.exists(), + "HDFSMetadataLog should not create directory when readOnly=true") + } + } }