Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ struct SparkTaskInfo {
int32_t partitionId{0};
// Same as TID.
int64_t taskId{0};
// Same as Spark SQL execution id. -1 means unavailable.
int64_t executionId{-1};
// virtual id for each backend internal use
int32_t vId{0};

std::string toString() const {
return "[Stage: " + std::to_string(stageId) + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) +
"]";
return "[Stage: " + std::to_string(stageId) + " Execution: " + std::to_string(executionId) +
" TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + "]";
}

friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& taskInfo) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,14 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
jint stageId,
jint partitionId,
jlong taskId,
jlong executionId,
jboolean enableDumping,
jstring spillDir) {
JNI_METHOD_START

auto ctx = getRuntime(env, wrapper);

ctx->setSparkTaskInfo({stageId, partitionId, taskId});
ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId});

if (enableDumping) {
ctx->enableDumping();
Expand Down
25 changes: 15 additions & 10 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";

} // namespace

namespace {
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
if (taskInfo.executionId != -1) {
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
}
return fmt::format(
"Gluten_Stage_{}_TID_{}_VTID_{}",
std::to_string(taskInfo.stageId),
std::to_string(taskInfo.taskId),
std::to_string(taskInfo.vId));
}
} // namespace
Comment on lines +75 to +86

WholeStageResultIterator::WholeStageResultIterator(
VeloxMemoryManager* memoryManager,
const std::shared_ptr<const facebook::velox::core::PlanNode>& planNode,
Expand Down Expand Up @@ -111,11 +124,7 @@ WholeStageResultIterator::WholeStageResultIterator(
velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();
task_ = velox::exec::Task::create(
fmt::format(
"Gluten_Stage_{}_TID_{}_VTID_{}",
std::to_string(taskInfo_.stageId),
std::to_string(taskInfo_.taskId),
std::to_string(taskInfo.vId)),
getVeloxTaskId(taskInfo_),
std::move(planFragment),
0,
std::move(queryCtx),
Expand Down Expand Up @@ -233,11 +242,7 @@ std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQ
gluten::VeloxBackend::get()->getAsyncDataCache(),
memoryManager_->getAggregateMemoryPool(),
spillExecutor_,
fmt::format(
"Gluten_Stage_{}_TID_{}_VTID_{}",
std::to_string(taskInfo_.stageId),
std::to_string(taskInfo_.taskId),
std::to_string(taskInfo_.vId)));
getVeloxTaskId(taskInfo_));
return ctx;
Comment on lines 243 to 246
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public class NativePlanEvaluator {
private static final Logger LOGGER = LoggerFactory.getLogger(NativePlanEvaluator.class);
private static final AtomicInteger id = new AtomicInteger(0);
private static final long INVALID_EXECUTION_ID = -1L;
private static final String SPARK_EXECUTION_ID_KEY = "spark.sql.execution.id";

private final Runtime runtime;
private final PlanEvaluatorJniWrapper jniWrapper;
Expand Down Expand Up @@ -78,14 +80,16 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator(
int partitionIndex,
String spillDirPath)
throws RuntimeException {
final TaskContext taskContext = TaskContext.get();
final long itrHandle =
jniWrapper.nativeCreateKernelWithIterator(
wsPlan,
splitInfo,
iterList,
TaskContext.get().stageId(),
taskContext.stageId(),
partitionIndex, // TaskContext.getPartitionId(),
TaskContext.get().taskAttemptId(),
taskContext.taskAttemptId(),
getExecutionId(taskContext),
DebugUtil.isDumpingEnabledForTask(),
spillDirPath);
final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle);
Expand Down Expand Up @@ -113,4 +117,18 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
private ColumnarBatchOutIterator createOutIterator(Runtime runtime, long itrHandle) {
return new ColumnarBatchOutIterator(runtime, itrHandle);
}

private static long getExecutionId(TaskContext taskContext) {
final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY);
if (executionId == null) {
return INVALID_EXECUTION_ID;
}
try {
return Long.parseLong(executionId);
} catch (NumberFormatException e) {
LOGGER.warn(
"Invalid Spark execution id '{}', fallback to {}", executionId, INVALID_EXECUTION_ID);
return INVALID_EXECUTION_ID;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public native long nativeCreateKernelWithIterator(
int stageId,
int partitionId,
long taskId,
long executionId,
boolean enableDumping,
String spillDir)
throws RuntimeException;
Expand Down
Loading