Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
GlutenPartition(
index,
planByteArray,
wsCtx.queryId,
splitInfos.toArray
)
}
Expand Down Expand Up @@ -315,6 +316,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
inputIterators: Seq[Iterator[ColumnarBatch]],
sparkConf: SparkConf,
rootNode: PlanNode,
queryId: String,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric,
buildThreads: SQLMetric): BuildSideRelation = {
buildThreads: SQLMetric,
queryId: String): BuildSideRelation = {

val (buildKeys, isNullAware) = mode match {
case mode1: HashedRelationBroadcastMode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ object GlutenDeltaJobStatsTracker extends Logging {
null,
null,
0,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
s"Gluten_Delta_Stats_${UUID.randomUUID()}"
)
nativeOutItr
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ object GlutenDeltaJobStatsTracker extends Logging {
null,
null,
0,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
s"Gluten_Delta_Stats_${UUID.randomUUID()}"
)
nativeOutItr
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public long rtHandle() {
return runtime.getHandle();
}

public static native void clearHashTable(long hashTableData);
public static native void clearHashTable(String cacheKey, long hashTableData);

public static native long cloneHashTable(long hashTableData);
public static native long cloneHashTable(String cacheKey, long hashTableData);

public native long nativeBuild(
String buildHashTableId,
Expand All @@ -51,5 +51,6 @@ public native long nativeBuild(
byte[] namedStruct,
boolean isNullAwareAntiJoin,
long bloomFilterPushdownSize,
int broadcastHashTableBuildThreads);
int broadcastHashTableBuildThreads,
String queryId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
GlutenPartition(
index,
planByteArray,
wsCtx.queryId,
splitInfos.toArray
)
}
Expand Down Expand Up @@ -216,7 +217,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null,
if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray else null,
partitionIndex,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
inputPartition.queryId
)
resIter.noMoreSplits()
val itrMetrics = IteratorMetricsJniWrapper.create()
Expand All @@ -243,6 +245,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
inputIterators: Seq[Iterator[ColumnarBatch]],
sparkConf: SparkConf,
rootNode: PlanNode,
queryId: String,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
Expand Down Expand Up @@ -270,7 +273,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
null,
if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray else null,
partitionIndex,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
queryId
)
nativeResultIterator.noMoreSplits()
val itrMetrics = IteratorMetricsJniWrapper.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric,
buildThreads: SQLMetric): BuildSideRelation = {
buildThreads: SQLMetric,
queryId: String): BuildSideRelation = {

val buildKeys = mode match {
case mode1: HashedRelationBroadcastMode =>
Expand Down Expand Up @@ -875,7 +876,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
mode,
newBuildKeys,
offload,
buildThreadsValue)
buildThreadsValue,
queryId)
}
} else {
ColumnarBuildSideRelation(
Expand All @@ -884,7 +886,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
mode,
newBuildKeys,
offload,
buildThreadsValue)
buildThreadsValue,
queryId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.ColumnarBuildSideRelation
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation
import org.apache.spark.sql.vectorized.ColumnarBatch

import io.substrait.proto.JoinRel
Expand Down Expand Up @@ -106,7 +108,7 @@ case class BroadcastHashJoinExecTransformer(
isNullAwareAntiJoin) {

// Unique ID for built table
lazy val buildBroadcastTableId: String = buildPlan.id.toString
lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan)

Comment on lines 110 to 112
override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match {
case _: InnerLike =>
Expand Down Expand Up @@ -134,17 +136,18 @@ case class BroadcastHashJoinExecTransformer(

override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
val streamedRDD = getColumnarInputRDDs(streamedPlan)
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
val queryId = extractQueryId(broadcast.value)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val cacheKey = s"$queryId:$buildHashTableId"
if (executionId != null) {
GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
GlutenDriverEndpoint.collectResources(executionId, cacheKey)
Comment on lines 141 to +144
Comment on lines 141 to +144
Comment on lines 141 to +144
} else {
logWarning(
s"Can not trace broadcast table data $buildBroadcastTableId" +
s"Can not trace broadcast table data $cacheKey" +
s" because execution id is null." +
s" Will clean up until expire time.")
}

val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
val bloomFilterPushdownSize = if (VeloxConfig.get.hashProbeDynamicFilterPushdownEnabled) {
VeloxConfig.get.hashProbeBloomFilterPushdownMaxSize
} else {
Expand Down Expand Up @@ -174,7 +177,7 @@ case class BroadcastHashJoinExecTransformer(
buildPlan.output,
filterBuildColumns,
filterPropagatesNulls,
buildBroadcastTableId,
cacheKey,
isNullAwareAntiJoin,
bloomFilterPushdownSize,
metrics.get("buildHashTableTime")
Expand All @@ -183,6 +186,15 @@ case class BroadcastHashJoinExecTransformer(
// FIXME: Do we have to make build side a RDD?
streamedRDD :+ broadcastRDD
}

private def extractQueryId(relation: BuildSideRelation): String = relation match {
case columnar: ColumnarBuildSideRelation if columnar.queryId.nonEmpty =>
columnar.queryId
case unsafe: UnsafeColumnarBuildSideRelation if unsafe.queryIdValue.nonEmpty =>
unsafe.queryIdValue
case _ =>
GlutenQueryContext.getRequiredQueryId(buildPlan)
}
}

case class BroadcastHashJoinContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object VeloxBroadcastBuildSideCache
}
}

HashJoinBuilder.clearHashTable(value.pointer)
HashJoinBuilder.clearHashTable(key, value.pointer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ object ColumnarBuildSideRelation {
mode: BroadcastMode,
newBuildKeys: Seq[Expression] = Seq.empty,
offload: Boolean = false,
buildThreads: Int = 1): ColumnarBuildSideRelation = {
buildThreads: Int = 1,
queryId: String = ""): ColumnarBuildSideRelation = {
val boundMode = mode match {
case HashedRelationBroadcastMode(keys, isNullAware) =>
// Bind each key to the build-side output so simple cols become BoundReference
Expand All @@ -68,7 +69,8 @@ object ColumnarBuildSideRelation {
BroadcastModeUtils.toSafe(boundMode),
newBuildKeys,
offload,
buildThreads)
buildThreads,
queryId)
}
}

Expand All @@ -78,7 +80,8 @@ case class ColumnarBuildSideRelation(
safeBroadcastMode: SafeBroadcastMode,
newBuildKeys: Seq[Expression],
offload: Boolean,
buildThreads: Int)
buildThreads: Int,
queryId: String)
extends BuildSideRelation
with Logging
with KnownSizeEstimation {
Expand Down Expand Up @@ -223,7 +226,8 @@ case class ColumnarBuildSideRelation(
SubstraitUtil.toNameStruct(newOutput).toByteArray,
broadcastContext.isNullAwareAntiJoin,
broadcastContext.bloomFilterPushdownSize,
buildThreads
buildThreads,
queryId
)

jniWrapper.close(serializeHandle)
Expand All @@ -234,7 +238,7 @@ case class ColumnarBuildSideRelation(

(hashTableData, this)
} else {
(HashJoinBuilder.cloneHashTable(hashTableData), null)
(HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ object UnsafeColumnarBuildSideRelation {
mode: BroadcastMode,
newBuildKeys: Seq[Expression] = Seq.empty,
offload: Boolean = false,
buildThreads: Int = 1): UnsafeColumnarBuildSideRelation = {
buildThreads: Int = 1,
queryId: String = ""): UnsafeColumnarBuildSideRelation = {
val boundMode = mode match {
case HashedRelationBroadcastMode(keys, isNullAware) =>
// Bind each key to the build-side output so simple cols become BoundReference
Expand All @@ -73,7 +74,8 @@ object UnsafeColumnarBuildSideRelation {
BroadcastModeUtils.toSafe(boundMode),
newBuildKeys,
offload,
buildThreads)
buildThreads,
queryId)
}
}

Expand All @@ -94,7 +96,8 @@ class UnsafeColumnarBuildSideRelation(
private var safeBroadcastMode: SafeBroadcastMode,
private var newBuildKeys: Seq[Expression],
private var offload: Boolean,
private var buildThreads: Int)
private var buildThreads: Int,
private var queryId: String)
extends BuildSideRelation
with Externalizable
with Logging
Expand All @@ -114,9 +117,11 @@ class UnsafeColumnarBuildSideRelation(

def isOffload: Boolean = offload

def queryIdValue: String = queryId

/** needed for serialization. */
def this() = {
this(null, null, null, Seq.empty, false, 1)
this(null, null, null, Seq.empty, false, 1, "")
}

private[unsafe] def getBatches(): Seq[UnsafeByteArray] = {
Expand Down Expand Up @@ -193,7 +198,8 @@ class UnsafeColumnarBuildSideRelation(
SubstraitUtil.toNameStruct(newOutput).toByteArray,
broadcastContext.isNullAwareAntiJoin,
broadcastContext.bloomFilterPushdownSize,
buildThreads
buildThreads,
queryId
)

jniWrapper.close(serializeHandle)
Expand All @@ -204,7 +210,7 @@ class UnsafeColumnarBuildSideRelation(

(hashTableData, this)
} else {
(HashJoinBuilder.cloneHashTable(hashTableData), null)
(HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null)
}
}

Expand All @@ -219,6 +225,7 @@ class UnsafeColumnarBuildSideRelation(
out.writeObject(newBuildKeys)
out.writeBoolean(offload)
out.writeInt(buildThreads)
out.writeUTF(queryId)
}

override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
Expand All @@ -228,6 +235,7 @@ class UnsafeColumnarBuildSideRelation(
kryo.writeClassAndObject(out, newBuildKeys)
out.writeBoolean(offload)
out.writeInt(buildThreads)
out.writeString(queryId)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -237,6 +245,7 @@ class UnsafeColumnarBuildSideRelation(
newBuildKeys = in.readObject().asInstanceOf[Seq[Expression]]
offload = in.readBoolean()
buildThreads = in.readInt()
queryId = in.readUTF()
}

override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
Expand All @@ -246,6 +255,7 @@ class UnsafeColumnarBuildSideRelation(
newBuildKeys = kryo.readClassAndObject(in).asInstanceOf[Seq[Expression]]
offload = in.readBoolean()
buildThreads = in.readInt()
queryId = in.readString()
}

private def transformProjection: UnsafeProjection = safeBroadcastMode match {
Expand Down
7 changes: 5 additions & 2 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ struct SparkTaskInfo {
int32_t partitionId{0};
// Same as TID.
int64_t taskId{0};
// Same as Spark SQL execution id. -1 means unavailable.
int64_t executionId{-1};
std::string queryId{};
// virtual id for each backend internal use
int32_t vId{0};

std::string toString() const {
return "[Stage: " + std::to_string(stageId) + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) +
"]";
return "[Stage: " + std::to_string(stageId) + " Execution: " + std::to_string(executionId) +
" TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + "]";
}

friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& taskInfo) {
Expand Down
4 changes: 3 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,15 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
jint stageId,
jint partitionId,
jlong taskId,
jlong executionId,
jstring queryId,
jboolean enableDumping,
jstring spillDir) {
JNI_METHOD_START

auto ctx = getRuntime(env, wrapper);

ctx->setSparkTaskInfo({stageId, partitionId, taskId});
ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId, jStringToCString(env, queryId)});

if (enableDumping) {
ctx->enableDumping();
Expand Down
Loading
Loading