diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 4ab944898bd..fff50a4df1d 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -42,12 +42,14 @@ struct SparkTaskInfo { int32_t partitionId{0}; // Same as TID. int64_t taskId{0}; + // Same as Spark SQL execution id. -1 means unavailable. + int64_t executionId{-1}; // virtual id for each backend internal use int32_t vId{0}; std::string toString() const { - return "[Stage: " + std::to_string(stageId) + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + - "]"; + return "[Stage: " + std::to_string(stageId) + " Execution: " + std::to_string(executionId) + + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + "]"; } friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& taskInfo) { diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index b1580782c01..78b85317db0 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -468,13 +468,14 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith jint stageId, jint partitionId, jlong taskId, + jlong executionId, jboolean enableDumping, jstring spillDir) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); - ctx->setSparkTaskInfo({stageId, partitionId, taskId}); + ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId}); if (enableDumping) { ctx->enableDumping(); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index c3ac095cdc7..21e27d54c46 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -72,6 +72,19 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; } // namespace +namespace { +std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { + if (taskInfo.executionId != -1) { + return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); + } + return fmt::format( + "Gluten_Stage_{}_TID_{}_VTID_{}", + std::to_string(taskInfo.stageId), + std::to_string(taskInfo.taskId), + std::to_string(taskInfo.vId)); +} +} // namespace + WholeStageResultIterator::WholeStageResultIterator( VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, @@ -111,11 +124,7 @@ WholeStageResultIterator::WholeStageResultIterator( velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; std::shared_ptr queryCtx = createNewVeloxQueryCtx(); task_ = velox::exec::Task::create( - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo.vId)), + getVeloxTaskId(taskInfo_), std::move(planFragment), 0, std::move(queryCtx), @@ -233,11 +242,7 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), spillExecutor_, - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo_.vId))); + getVeloxTaskId(taskInfo_)); return ctx; } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java index 6d2c90896b2..8ddbb4b3d39 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java @@ -34,6 +34,8 @@ public class NativePlanEvaluator { private static final Logger LOGGER = LoggerFactory.getLogger(NativePlanEvaluator.class); private static final AtomicInteger id = new AtomicInteger(0); + private static final long INVALID_EXECUTION_ID = -1L; + private static final String SPARK_EXECUTION_ID_KEY = "spark.sql.execution.id"; private final Runtime runtime; private final PlanEvaluatorJniWrapper jniWrapper; @@ -78,14 +80,16 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( int partitionIndex, String spillDirPath) throws RuntimeException { + final TaskContext taskContext = TaskContext.get(); final long itrHandle = jniWrapper.nativeCreateKernelWithIterator( wsPlan, splitInfo, iterList, - TaskContext.get().stageId(), + taskContext.stageId(), partitionIndex, // TaskContext.getPartitionId(), - TaskContext.get().taskAttemptId(), + taskContext.taskAttemptId(), + getExecutionId(taskContext), DebugUtil.isDumpingEnabledForTask(), spillDirPath); final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle); @@ -113,4 +117,18 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { private ColumnarBatchOutIterator createOutIterator(Runtime runtime, long itrHandle) { return new ColumnarBatchOutIterator(runtime, itrHandle); } + + private static long getExecutionId(TaskContext taskContext) { + final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY); + if (executionId == null) { + return INVALID_EXECUTION_ID; + } + try { + return Long.parseLong(executionId); + } catch (NumberFormatException e) { + LOGGER.warn( + "Invalid Spark execution id '{}', fallback to {}", executionId, INVALID_EXECUTION_ID); + return INVALID_EXECUTION_ID; + } + } } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java index a8082906798..f0c3d804bfe 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java @@ -76,6 +76,7 @@ public native long nativeCreateKernelWithIterator( int stageId, int partitionId, long taskId, + long executionId, boolean enableDumping, String spillDir) throws RuntimeException;