Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit 80562b9

Browse files
cppwfsmarkpollack
authored andcommitted
Allows user to specify different CTR app when launching composed tasks
resolves #2290 Updated based on request to change name in shell
1 parent cbfd5b4 commit 80562b9

File tree

17 files changed

+252
-21
lines changed

17 files changed

+252
-21
lines changed

spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/TaskOperations.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ public interface TaskOperations {
6565
* @param name the name of the task
6666
* @param properties the deployment properties
6767
* @param arguments the command line arguments
68+
* @param alternateComposedTaskRunnerApp app to use when running composed tasks instead of default
6869
* @return long containing the TaskExecutionId
6970
*/
70-
long launch(String name, Map<String, String> properties, List<String> arguments);
71+
long launch(String name, Map<String, String> properties, List<String> arguments, String alternateComposedTaskRunnerApp);
7172

7273
/**
7374
* Request the stop of a group {@link org.springframework.cloud.task.repository.TaskExecution}s.

spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/TaskTemplate.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,13 @@ public TaskDefinitionResource create(String name, String definition) {
152152
}
153153

154154
@Override
155-
public long launch(String name, Map<String, String> properties, List<String> arguments) {
155+
public long launch(String name, Map<String, String> properties, List<String> arguments, String alternateComposedTaskRunnerApp) {
156156
MultiValueMap<String, Object> values = new LinkedMultiValueMap<>();
157157
values.add("properties", DeploymentPropertiesUtils.format(properties));
158158
values.add("arguments", StringUtils.collectionToDelimitedString(arguments, " "));
159+
if(alternateComposedTaskRunnerApp != null) {
160+
values.add("ctrname", alternateComposedTaskRunnerApp);
161+
}
159162
return restTemplate.postForObject(executionByNameLink.expand(name).getHref(), values, Long.class, name);
160163
}
161164

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.dataflow.server.controller;
18+
19+
/**
20+
* Thrown when a user attempts to launch a non composed task while specifying a composed task runner name.
21+
*
22+
* @author Glenn Renfro
23+
*/
24+
public class InvalidCTRLaunchRequestException extends RuntimeException {
25+
26+
public InvalidCTRLaunchRequestException(String taskName) {
27+
super(String.format("Can not specify a Composed Task Runner Name when launching a non composed task definition (%s)", taskName));
28+
}
29+
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/RestControllerAdvice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public VndErrors onException(Exception e) {
9797
*/
9898
@ExceptionHandler({ AppAlreadyRegisteredException.class, DuplicateStreamDefinitionException.class,
9999
DuplicateTaskException.class, StreamAlreadyDeployedException.class, StreamAlreadyDeployingException.class,
100-
UnregisterAppException.class})
100+
UnregisterAppException.class, InvalidCTRLaunchRequestException.class})
101101
@ResponseStatus(HttpStatus.CONFLICT)
102102
@ResponseBody
103103
public VndErrors onConflictException(Exception e) {

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionController.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public PagedResources<TaskExecutionResource> retrieveTasksByName(@RequestParam("
153153
* path.
154154
*
155155
* @param taskName the name of the existing task to be executed (required)
156+
* @param ctrname user specified name of a ctr app if different than the default.
156157
* @param properties the runtime properties for the task, as a comma-delimited list of
157158
* key=value pairs
158159
* @param arguments the runtime commandline arguments
@@ -161,11 +162,12 @@ public PagedResources<TaskExecutionResource> retrieveTasksByName(@RequestParam("
161162
@RequestMapping(value = "", method = RequestMethod.POST, params = "name")
162163
@ResponseStatus(HttpStatus.CREATED)
163164
public long launch(@RequestParam("name") String taskName,
165+
@RequestParam(required = false) String ctrname,
164166
@RequestParam(required = false) String properties,
165167
@RequestParam(required = false) String arguments) {
166168
Map<String, String> propertiesToUse = DeploymentPropertiesUtils.parse(properties);
167169
List<String> argumentsToUse = DeploymentPropertiesUtils.parseParamList(arguments, " ");
168-
return this.taskExecutionService.executeTask(taskName, propertiesToUse, argumentsToUse);
170+
return this.taskExecutionService.executeTask(taskName, propertiesToUse, argumentsToUse, ctrname);
169171
}
170172

171173
/**

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionInfoService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,19 @@
2828
* @author Daniel Serleg
2929
* @author Mark Pollack
3030
* @author David Turanski
31+
* @author Glenn Renfro
3132
*/
3233
public interface TaskExecutionInfoService {
3334

35+
/**
36+
* Create a the {@link TaskExecutionInformation} instance for the information provided.
37+
* @param taskName the name of the task definition
38+
* @param taskDeploymentProperties the deployment properties to use for the {@link TaskExecutionInformation}
39+
* @param composedTaskRunnerName user provided CTR app name to use.
40+
* @return instance of {@link TaskExecutionInformation}
41+
*/
3442
TaskExecutionInformation findTaskExecutionInformation(String taskName,
35-
Map<String, String> taskDeploymentProperties);
43+
Map<String, String> taskDeploymentProperties, String composedTaskRunnerName);
3644

3745
AllPlatformsTaskExecutionInformation findAllPlatformTaskExecutionInformation();
3846
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,18 @@ public interface TaskExecutionService {
3838
*
3939
* @param taskName Name of the task. Must not be null or empty.
4040
* @param taskDeploymentProperties Optional deployment properties. Must not be null.
41-
* @param commandLineArgs Optional runtime commandline arguments
41+
* @param commandLineArgs Optional runtime commandline argument
42+
* @param composedTaskRunnerName the name of the app the user would like to use if they don't want the default. If null default will be used.
43+
* @return the taskExecutionId for the executed task.
44+
*/
45+
long executeTask(String taskName, Map<String, String> taskDeploymentProperties, List<String> commandLineArgs, String composedTaskRunnerName);
46+
47+
/**
48+
* Execute a task with the provided task name and optional runtime properties.
49+
*
50+
* @param taskName Name of the task. Must not be null or empty.
51+
* @param taskDeploymentProperties Optional deployment properties. Must not be null.
52+
* @param commandLineArgs Optional runtime commandline argument
4253
* @return the taskExecutionId for the executed task.
4354
*/
4455
long executeTask(String taskName, Map<String, String> taskDeploymentProperties, List<String> commandLineArgs);

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionInfoService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
import org.springframework.cloud.dataflow.core.dsl.TaskNode;
2929
import org.springframework.cloud.dataflow.core.dsl.TaskParser;
3030
import org.springframework.cloud.dataflow.registry.service.AppRegistryService;
31+
import org.springframework.cloud.dataflow.server.controller.InvalidCTRLaunchRequestException;
32+
import org.springframework.cloud.dataflow.server.controller.NoSuchAppException;
3133
import org.springframework.cloud.dataflow.server.job.LauncherRepository;
3234
import org.springframework.cloud.dataflow.server.repository.NoSuchTaskDefinitionException;
3335
import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository;
3436
import org.springframework.cloud.dataflow.server.service.TaskExecutionInfoService;
3537
import org.springframework.cloud.task.repository.TaskExplorer;
3638
import org.springframework.util.Assert;
39+
import org.springframework.util.StringUtils;
3740

3841
/**
3942
* Default implementation of the {@link DefaultTaskExecutionInfoService} interface.
@@ -110,7 +113,7 @@ public DefaultTaskExecutionInfoService(DataSourceProperties dataSourceProperties
110113

111114
@Override
112115
public TaskExecutionInformation findTaskExecutionInformation(String taskName,
113-
Map<String, String> taskDeploymentProperties) {
116+
Map<String, String> taskDeploymentProperties, String composedTaskRunnerName) {
114117
Assert.hasText(taskName, "The provided taskName must not be null or empty.");
115118
Assert.notNull(taskDeploymentProperties, "The provided runtimeProperties must not be null.");
116119

@@ -123,9 +126,15 @@ public TaskExecutionInformation findTaskExecutionInformation(String taskName,
123126
TaskNode taskNode = taskParser.parse();
124127
// if composed task definition replace definition with one composed task
125128
// runner and executable graph.
129+
if(!taskNode.isComposed() && StringUtils.hasText(composedTaskRunnerName)) {
130+
throw new InvalidCTRLaunchRequestException(taskName);
131+
}
126132
if (taskNode.isComposed()) {
133+
if(StringUtils.hasText(composedTaskRunnerName) && !this.appRegistryService.appExist(composedTaskRunnerName, ApplicationType.task)) {
134+
throw new NoSuchAppException(composedTaskRunnerName);
135+
}
127136
taskDefinition = new TaskDefinition(taskDefinition.getName(),
128-
TaskServiceUtils.createComposedTaskDefinition(
137+
TaskServiceUtils.createComposedTaskDefinition(composedTaskRunnerName,
129138
taskNode.toExecutableDSL(), taskConfigurationProperties));
130139
retData.setTaskDeploymentProperties(
131140
TaskServiceUtils.establishComposedTaskProperties(taskDeploymentProperties,

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,10 @@ public DefaultTaskExecutionService(LauncherRepository launcherRepository,
151151
this.dataflowTaskExecutionDao = dataflowTaskExecutionDao;
152152
}
153153

154+
155+
154156
@Override
155-
public long executeTask(String taskName, Map<String, String> taskDeploymentProperties, List<String> commandLineArgs) {
157+
public long executeTask(String taskName, Map<String, String> taskDeploymentProperties, List<String> commandLineArgs, String composedTaskRunnerName) {
156158

157159
String platformName = taskDeploymentProperties.get(TASK_PLATFORM_NAME);
158160
if (!StringUtils.hasText(platformName)) {
@@ -178,7 +180,8 @@ public long executeTask(String taskName, Map<String, String> taskDeploymentPrope
178180
}
179181
}
180182
TaskExecutionInformation taskExecutionInformation = taskExecutionInfoService
181-
.findTaskExecutionInformation(taskName, taskDeploymentProperties);
183+
.findTaskExecutionInformation(taskName, taskDeploymentProperties, composedTaskRunnerName);
184+
182185
TaskExecution taskExecution = taskExecutionRepositoryService.createTaskExecution(taskName);
183186

184187
AppDeploymentRequest request = this.taskAppDeploymentRequestCreator.
@@ -206,6 +209,11 @@ public long executeTask(String taskName, Map<String, String> taskDeploymentPrope
206209
return taskExecution.getExecutionId();
207210
}
208211

212+
@Override
213+
public long executeTask(String taskName, Map<String, String> taskDeploymentProperties, List<String> commandLineArgs) {
214+
return executeTask(taskName, taskDeploymentProperties, commandLineArgs, null);
215+
}
216+
209217
@Override
210218
public String getLog(String platformName, String taskId) {
211219
Launcher launcher = this.launcherRepository.findByName(platformName);

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,9 @@ public void restartJobExecution(long jobExecutionId) throws NoSuchJobExecutionEx
188188
if (platformName != null) {
189189
Map<String, String> deploymentProperties = new HashMap<>();
190190
deploymentProperties.put(DefaultTaskExecutionService.TASK_PLATFORM_NAME, platformName);
191+
String taskAppName = taskJobExecution.getJobExecution().getJobParameters().getString("-spring.cloud.data.flow.taskappname");
191192
taskExecutionService.executeTask(taskDefinition.getName(), deploymentProperties,
192-
taskExecution.getArguments());
193+
taskExecution.getArguments(), taskAppName);
193194
} else {
194195
throw new IllegalStateException(String.format("Did not find platform for taskName=[%s] , taskId=[%s]",
195196
taskExecution.getTaskName(),taskJobExecution.getTaskId()));

0 commit comments

Comments
 (0)