-
Notifications
You must be signed in to change notification settings - Fork 749
Independent Dynamic Scaling for different Activities in Temporal WorkFlow #4159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
0ca2cb7
383a7cd
a428b4b
7b98621
af5e22b
c6f8647
4bdc43a
c31b0ca
bb49427
85959e3
82b4dc0
04b2b6c
3528f88
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |||||||||||||
| package org.apache.gobblin.temporal; | ||||||||||||||
|
|
||||||||||||||
| import org.apache.gobblin.annotation.Alpha; | ||||||||||||||
| import org.apache.gobblin.temporal.ddm.worker.ExecutionWorker; | ||||||||||||||
| import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldJobLauncher; | ||||||||||||||
| import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker; | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -29,14 +30,20 @@ | |||||||||||||
| public interface GobblinTemporalConfigurationKeys { | ||||||||||||||
|
|
||||||||||||||
| String PREFIX = "gobblin.temporal."; | ||||||||||||||
| String STAGE_SPECIFIC_PREFIX = PREFIX + "stage."; | ||||||||||||||
|
|
||||||||||||||
| String WORKER_CLASS = PREFIX + "worker.class"; | ||||||||||||||
| String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName(); | ||||||||||||||
| String EXECUTION_WORKER_CLASS = ExecutionWorker.class.getName(); | ||||||||||||||
| String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace"; | ||||||||||||||
| String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace"; | ||||||||||||||
|
|
||||||||||||||
| String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name"; | ||||||||||||||
| String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue"; | ||||||||||||||
|
|
||||||||||||||
| // Execution task queue for work execution specialization | ||||||||||||||
| String EXECUTION_TASK_QUEUE = PREFIX + "execution.task.queue.name"; | ||||||||||||||
| String DEFAULT_EXECUTION_TASK_QUEUE = "GobblinTemporalExecutionQueue"; | ||||||||||||||
| String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher."; | ||||||||||||||
| String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class"; | ||||||||||||||
| String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = HelloWorldJobLauncher.class.getName(); | ||||||||||||||
|
|
@@ -134,4 +141,8 @@ public interface GobblinTemporalConfigurationKeys { | |||||||||||||
| String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts"; | ||||||||||||||
| int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4; | ||||||||||||||
|
|
||||||||||||||
|
||||||||||||||
| /** | |
| * Memory allocation (in megabytes) for execution worker containers when dynamic scaling is enabled. | |
| * This value determines the amount of memory assigned to each worker container during execution. | |
| */ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -238,6 +238,7 @@ public void start() | |
| for (int i = 0; i < this.numTemporalWorkers; i++) { | ||
| workers.add(initiateWorker()); | ||
| } | ||
| initializeExecutionWorkers(); | ||
| }catch (Exception e) { | ||
| logger.info(e + " for initiate workers"); | ||
| throw new RuntimeException(e); | ||
|
|
@@ -252,8 +253,8 @@ private TemporalWorker initiateWorker() throws Exception { | |
| WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance( | ||
| managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace); | ||
|
|
||
| String workerClassName = ConfigUtils.getString(clusterConfig, | ||
| GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); | ||
| String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, | ||
| GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); | ||
| logger.info("Creating worker - class: '{}'", workerClassName); | ||
| Config workerConfig = clusterConfig; | ||
| TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor( | ||
|
|
@@ -263,6 +264,38 @@ private TemporalWorker initiateWorker() throws Exception { | |
| return worker; | ||
| } | ||
|
|
||
| private void initializeExecutionWorkers() throws Exception { | ||
| boolean dynamicScalingEnabled = ConfigUtils.getBoolean(clusterConfig, | ||
| GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false); | ||
|
|
||
| if (!dynamicScalingEnabled) { | ||
| return; | ||
| } | ||
|
|
||
| String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, | ||
| GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); | ||
| boolean isExecutionWorkerContainer = GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS.equals(workerClassName); | ||
|
|
||
| // only the initial container (WorkFulfillment worker) should start an additional ExecutionWorker worker | ||
| if (isExecutionWorkerContainer) { | ||
| return; | ||
| } | ||
|
|
||
| logger.info("Starting additional ExecutionWorker in initial container"); | ||
|
|
||
| String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE, | ||
| GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); | ||
| WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance( | ||
| managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace); | ||
|
|
||
| TemporalWorker executionWorker = GobblinConstructorUtils.invokeLongestConstructor( | ||
| (Class<TemporalWorker>)Class.forName(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS), | ||
| clusterConfig, client); | ||
| executionWorker.start(); | ||
| workers.add(executionWorker); | ||
| logger.info("Worker started for class: {}", GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we tested the scenario with 1 container? |
||
|
|
||
| private void initMetricReporter() { | ||
| if (this.containerMetrics.isPresent()) { | ||
| try { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,86 @@ | ||||||
| /* | ||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||
| * this work for additional information regarding copyright ownership. | ||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
| * (the "License"); you may not use this file except in compliance with | ||||||
| * the License. You may obtain a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| * See the License for the specific language governing permissions and | ||||||
| * limitations under the License. | ||||||
| */ | ||||||
|
|
||||||
| package org.apache.gobblin.temporal.ddm.worker; | ||||||
|
|
||||||
| import java.util.concurrent.TimeUnit; | ||||||
|
|
||||||
| import com.typesafe.config.Config; | ||||||
|
|
||||||
| import io.temporal.client.WorkflowClient; | ||||||
| import io.temporal.worker.WorkerOptions; | ||||||
|
|
||||||
| import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; | ||||||
| import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; | ||||||
| import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; | ||||||
| import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; | ||||||
| import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; | ||||||
| import org.apache.gobblin.util.ConfigUtils; | ||||||
|
|
||||||
|
|
||||||
| /** | ||||||
| * Specialized worker for Work Execution stage. | ||||||
| * This worker only registers activities for: | ||||||
| * - ProcessWorkUnit (Work Execution) | ||||||
| * | ||||||
| * Runs on containers with stage-specific memory for work execution operations. | ||||||
| * Polls the execution task queue to ensure activities run on appropriately-sized containers. | ||||||
| */ | ||||||
| public class ExecutionWorker extends AbstractTemporalWorker { | ||||||
| public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; | ||||||
| public int maxExecutionConcurrency; | ||||||
|
||||||
| public int maxExecutionConcurrency; | |
| private int maxExecutionConcurrency; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this config be process workunit specific instead of using a generic config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it makes sense to have different configs for this? Currently this is being driven by TEMPORAL_NUM_THREADS_PER_WORKER, but suppose If I want to reduce the number of threads in the worker but don't want to reduce the execution local activity concurrenncy.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.gobblin.temporal.ddm.workflow; | ||
|
|
||
| import com.typesafe.config.Config; | ||
| import lombok.Getter; | ||
|
|
||
| import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; | ||
|
|
||
| /** | ||
| * Represents the different stages of a Gobblin Temporal workflow. | ||
| * | ||
| * <p>Stages: | ||
| * <ul> | ||
| * <li>WORK_DISCOVERY: Discovers data sources, generates work units (uses default queue)</li> | ||
| * <li>WORK_EXECUTION: Processes work units to transform and load data (uses execution queue when dynamic scaling enabled)</li> | ||
| * <li>COMMIT: Commits work units (uses default queue)</li> | ||
| * </ul> | ||
| * | ||
| * <p>Queue routing: | ||
| * <ul> | ||
| * <li>Dynamic scaling OFF: All stages use default queue</li> | ||
| * <li>Dynamic scaling ON: WORK_EXECUTION uses dedicated execution queue, others use default queue</li> | ||
| * </ul> | ||
| */ | ||
| @Getter | ||
| public enum WorkflowStage { | ||
| WORK_DISCOVERY("workDiscovery", GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE, | ||
| GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE), | ||
| WORK_EXECUTION("workExecution", GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE, | ||
| GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE), | ||
| COMMIT("commit", GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE, | ||
| GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE); | ||
|
|
||
| private final String profileBaseName; | ||
| private final String taskQueueConfigKey; | ||
| private final String defaultTaskQueue; | ||
|
|
||
| WorkflowStage(String profileBaseName, String taskQueueConfigKey, String defaultTaskQueue) { | ||
| this.profileBaseName = profileBaseName; | ||
| this.taskQueueConfigKey = taskQueueConfigKey; | ||
| this.defaultTaskQueue = defaultTaskQueue; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the task queue for this stage, reading from config or using default. | ||
| * Example: "GobblinTemporalDiscoveryCommitQueue", "GobblinTemporalExecutionQueue" | ||
| * | ||
| * @param config the configuration to read from | ||
| * @return the task queue name for this stage | ||
| */ | ||
| public String getTaskQueue(Config config) { | ||
| return config.hasPath(taskQueueConfigKey) | ||
| ? config.getString(taskQueueConfigKey) | ||
| : defaultTaskQueue; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove the whitespace change