Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
batchId: Long): Option[Int] = {
if (storeMetadata.nonEmpty &&
storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) {
new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog
new StreamingQueryCheckpointMetadata(session, checkpointLocation, readOnly = true).offsetLog
.get(batchId)
.flatMap(_.metadataOpt)
.flatMap(_.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key))
Expand Down Expand Up @@ -178,7 +178,8 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
private def buildSqlConfForBatch(
checkpointLocation: String,
batchId: Long): SQLConf = {
val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog
val offsetLog = new StreamingQueryCheckpointMetadata(
session, checkpointLocation, readOnly = true).offsetLog
offsetLog.get(batchId) match {
case Some(value) =>
val metadata = value.metadataOpt.getOrElse(
Expand Down Expand Up @@ -764,7 +765,8 @@ object StateSourceOptions extends DataSourceOptions with Logging{
}

private def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Long = {
val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog
val commitLog = new StreamingQueryCheckpointMetadata(
session, checkpointLocation, readOnly = true).commitLog
commitLog.getLatest() match {
case Some((lastId, _)) => lastId
case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
Expand All @@ -776,7 +778,8 @@ object StateSourceOptions extends DataSourceOptions with Logging{
batchId: Long,
operatorId: Long,
checkpointLocation: String): Option[Array[Array[String]]] = {
val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog
val commitLog = new StreamingQueryCheckpointMetadata(
session, checkpointLocation, readOnly = true).commitLog
val commitMetadata = commitLog.get(batchId) match {
case Some(commitMetadata) => commitMetadata
case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ import org.apache.spark.sql.internal.SQLConf
* line 1: version
* line 2: metadata (optional json string)
*/
class CommitLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[CommitMetadata](sparkSession, path) {
class CommitLog(
sparkSession: SparkSession,
path: String,
readOnly: Boolean = false)
extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) {

import CommitLog._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ import org.apache.spark.util.Utils
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
*/
class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
class HDFSMetadataLog[T <: AnyRef : ClassTag](
sparkSession: SparkSession,
path: String,
readOnly: Boolean = false)
extends MetadataLog[T] with Logging {

private implicit val formats: Formats = Serialization.formats(NoTypeHints)
Expand All @@ -66,7 +69,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
protected val fileManager =
CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf())

if (!fileManager.exists(metadataPath)) {
// When readOnly is false and the metadata path does not exist, create the directory
if (!readOnly && !fileManager.exists(metadataPath)) {
fileManager.mkdirs(metadataPath)
}

Expand Down Expand Up @@ -327,6 +331,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:

/** List the available batches on file system. */
protected def listBatches: Array[Long] = {
// If parent doesn't exist, return empty array rather than throwing an exception
if (!fileManager.exists(metadataPath)) {
return Array.empty
}
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
// Batches must be files
.filter(f => f.isFile)
Expand All @@ -351,6 +359,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
* @return array of batches ids
*/
def listBatchesOnDisk: Array[Long] = {
// If parent doesn't exist, return empty array rather than throwing an exception
if (!fileManager.exists(metadataPath)) {
return Array.empty
}
fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath)).sorted
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ import org.apache.spark.sql.execution.streaming.runtime.SerializedOffset
* 1:{3} // sourceId:offset
* ...
*/
class OffsetSeqLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path) {
class OffsetSeqLog(
sparkSession: SparkSession,
path: String,
readOnly: Boolean = false)
extends HDFSMetadataLog[OffsetSeqBase](sparkSession, path, readOnly) {

override protected def deserialize(in: InputStream): OffsetSeqBase = {
// called inside a try-finally where the underlying stream is closed in the caller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import org.apache.spark.sql.internal.SQLConf
* @param sparkSession Spark session
* @param resolvedCheckpointRoot The resolved checkpoint root path
*/
class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheckpointRoot: String) {
class StreamingQueryCheckpointMetadata(
sparkSession: SparkSession,
resolvedCheckpointRoot: String,
readOnly: Boolean = false) {

/**
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
Expand All @@ -39,15 +42,19 @@ class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheck
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
lazy val offsetLog =
new OffsetSeqLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS))
new OffsetSeqLog(sparkSession,
checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
readOnly)

/**
* A log that records the batch ids that have completed. This is used to check if a batch was
* fully processed, and its output was committed to the sink, hence no need to process it again.
* This is used (for instance) during restart, to help identify which batch to run next.
*/
lazy val commitLog =
new CommitLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS))
new CommitLog(sparkSession,
checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
readOnly)

/** Metadata associated with the whole query */
final lazy val streamMetadata: StreamMetadata = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,15 @@ object OperatorStateMetadataUtils extends Logging {
}

def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): Long = {
val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog
val offsetLog = new StreamingQueryCheckpointMetadata(
session, checkpointLocation, readOnly = true).offsetLog
offsetLog.getLatest().map(_._1).getOrElse(throw
StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))
}

def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Option[Long] = {
val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog
val commitLog = new StreamingQueryCheckpointMetadata(
session, checkpointLocation, readOnly = true).commitLog
commitLog.getLatest().map(_._1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,107 @@ class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase {
s"but got: ${e.getClass.getSimpleName}: ${e.getMessage}")
}

test("deleted offsets directory is not recreated on read") {
withTempDir { tempDir =>
val checkpointPath = tempDir.getAbsolutePath
runLargeDataStreamingAggregationQuery(checkpointPath)

val offsetsDir = new File(tempDir, "offsets")
assert(offsetsDir.exists(), "Offsets directory should exist after running the query")
Utils.deleteRecursively(offsetsDir)
assert(!offsetsDir.exists(), "Offsets directory should be deleted")

val e1 = intercept[Exception] {
spark.read
.format("statestore")
.option(StateSourceOptions.PATH, checkpointPath)
.load()
.collect()
}
assertCauseChainContains(e1,
classOf[StateDataSourceOffsetLogUnavailable])

assert(!offsetsDir.exists(),
"State data source reader should not recreate the deleted offsets directory")
}
}

test("deleted commits directory is not recreated on read") {
withTempDir { tempDir =>
val checkpointPath = tempDir.getAbsolutePath
runLargeDataStreamingAggregationQuery(checkpointPath)

val commitsDir = new File(tempDir, "commits")
assert(commitsDir.exists(), "Commits directory should exist after running the query")
Utils.deleteRecursively(commitsDir)
assert(!commitsDir.exists(), "Commits directory should be deleted")

val e2 = intercept[Exception] {
spark.read
.format("statestore")
.option(StateSourceOptions.PATH, checkpointPath)
.load()
.collect()
}
assertCauseChainContains(e2,
classOf[StataDataSourceCommittedBatchUnavailable])

assert(!commitsDir.exists(),
"State data source reader should not recreate the deleted commits directory")
}
}

test("deleted commits directory is not recreated on read (state-metadata source)") {
withTempDir { tempDir =>
val checkpointPath = tempDir.getAbsolutePath
runLargeDataStreamingAggregationQuery(checkpointPath)

val commitsDir = new File(tempDir, "commits")
assert(commitsDir.exists(), "Commits directory should exist after running the query")
Utils.deleteRecursively(commitsDir)
assert(!commitsDir.exists(), "Commits directory should be deleted")

spark.read.format("state-metadata").load(checkpointPath).collect()

assert(!commitsDir.exists(),
"State-metadata source reader should not recreate the deleted commits directory")
}
}

test("deleted offsets directory is not recreated on read (state-metadata source)") {
withTempDir { tempDir =>
val checkpointPath = tempDir.getAbsolutePath
runLargeDataStreamingAggregationQuery(checkpointPath)

val offsetsDir = new File(tempDir, "offsets")
assert(offsetsDir.exists(), "Offsets directory should exist after running the query")
Utils.deleteRecursively(offsetsDir)
assert(!offsetsDir.exists(), "Offsets directory should be deleted")

spark.read.format("state-metadata").load(checkpointPath).collect()

assert(!offsetsDir.exists(),
"State-metadata source reader should not recreate the deleted offsets directory")
}
}

test("deleted state directory is not recreated on read (state-metadata source)") {
withTempDir { tempDir =>
val checkpointPath = tempDir.getAbsolutePath
runLargeDataStreamingAggregationQuery(checkpointPath)

val stateDir = new File(tempDir, "state")
assert(stateDir.exists(), "State directory should exist after running the query")
Utils.deleteRecursively(stateDir)
assert(!stateDir.exists(), "State directory should be deleted")

spark.read.format("state-metadata").load(checkpointPath).collect()

assert(!stateDir.exists(),
"State-metadata source reader should not recreate the deleted state directory")
}
}

/**
* Runs a stateful query to create the checkpoint structure, deletes the state directory,
* then attempts to read via the state data source and verifies that the state directory
Expand Down Expand Up @@ -1656,5 +1757,4 @@ class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase {
}
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,24 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L)))
intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L)))
}

test("HDFSMetadataLog: readOnly=false always creates directory") {
withTempDir { temp =>
val dir = new File(temp, "nonexistent")
assert(!dir.exists())
new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
assert(dir.exists(),
"HDFSMetadataLog should create directory when readOnly=false (default)")
}
}

test("HDFSMetadataLog: readOnly=true does not create directory") {
withTempDir { temp =>
val dir = new File(temp, "nonexistent")
assert(!dir.exists())
new HDFSMetadataLog[String](spark, dir.getAbsolutePath, readOnly = true)
assert(!dir.exists(),
"HDFSMetadataLog should not create directory when readOnly=true")
}
}
}