Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit d092a68

Browse files
cppwfsmarkpollack
authored andcommitted
Resolves Delayed Transaction commit on task launch[1.7.x]
1 parent 9135c59 commit d092a68

File tree

9 files changed

+364
-22
lines changed

9 files changed

+364
-22
lines changed

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@
4343
import org.springframework.cloud.dataflow.server.repository.DeploymentIdRepository;
4444
import org.springframework.cloud.dataflow.server.repository.RdbmsTaskDefinitionRepository;
4545
import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository;
46+
import org.springframework.cloud.dataflow.server.service.TaskExecutionCreationService;
4647
import org.springframework.cloud.dataflow.server.service.TaskJobService;
4748
import org.springframework.cloud.dataflow.server.service.TaskService;
4849
import org.springframework.cloud.dataflow.server.service.TaskValidationService;
50+
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskExecutionCreationService;
4951
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskJobService;
5052
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskService;
5153
import org.springframework.cloud.dataflow.server.service.impl.TaskConfigurationProperties;
@@ -93,17 +95,22 @@ public TaskRepository taskRepository(DataSource dataSource) {
9395
return new SimpleTaskRepository(new TaskExecutionDaoFactoryBean(dataSource));
9496
}
9597

98+
@Bean
99+
public TaskExecutionCreationService taskExecutionRepositoryService(TaskRepository taskRepository) {
100+
return new DefaultTaskExecutionCreationService(taskRepository);
101+
}
102+
96103
@Bean
97104
@ConditionalOnBean(TaskDefinitionRepository.class)
98105
public TaskService taskService(TaskDefinitionRepository repository, TaskExplorer taskExplorer,
99106
TaskRepository taskExecutionRepository, AppRegistryCommon registry, TaskLauncher taskLauncher,
100107
ApplicationConfigurationMetadataResolver metadataResolver,
101108
TaskConfigurationProperties taskConfigurationProperties, DeploymentIdRepository deploymentIdRepository,
102109
AuditRecordService auditRecordService, CommonApplicationProperties commonApplicationProperties,
103-
TaskValidationService taskValidationService) {
110+
TaskValidationService taskValidationService, TaskExecutionCreationService taskExecutionCreationService) {
104111
return new DefaultTaskService(dataSourceProperties, repository, taskExplorer, taskExecutionRepository, registry,
105112
taskLauncher, metadataResolver, taskConfigurationProperties, deploymentIdRepository, auditRecordService,
106-
dataflowServerUri, commonApplicationProperties, taskValidationService);
113+
dataflowServerUri, commonApplicationProperties, taskValidationService, taskExecutionCreationService);
107114
}
108115

109116
@Bean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.dataflow.server.service;
18+
19+
import org.springframework.cloud.task.repository.TaskExecution;
20+
21+
/**
22+
* Offers features to allow users to create {@link TaskExecution}s.
23+
*/
24+
public interface TaskExecutionCreationService {
25+
26+
27+
/**
28+
* Creates a {@link TaskExecution} using the specified taskName
29+
* @param taskName the name to be associated with the {@link TaskExecution}
30+
* @return {@link TaskExecution}
31+
*/
32+
TaskExecution createTaskExecution(String taskName);
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.dataflow.server.service.impl;
18+
19+
import org.springframework.cloud.dataflow.server.service.TaskExecutionCreationService;
20+
import org.springframework.cloud.task.repository.TaskExecution;
21+
import org.springframework.cloud.task.repository.TaskRepository;
22+
import org.springframework.transaction.annotation.Propagation;
23+
import org.springframework.transaction.annotation.Transactional;
24+
import org.springframework.util.Assert;
25+
26+
@Transactional
27+
public class DefaultTaskExecutionCreationService implements TaskExecutionCreationService {
28+
29+
private TaskRepository taskRepository;
30+
31+
public DefaultTaskExecutionCreationService(TaskRepository taskRepository) {
32+
Assert.notNull(taskRepository, "taskRepository must not be null");
33+
this.taskRepository = taskRepository;
34+
}
35+
36+
@Override
37+
@Transactional(propagation = Propagation.REQUIRES_NEW)
38+
public TaskExecution createTaskExecution(String taskName) {
39+
return taskRepository.createTaskExecution(taskName);
40+
}
41+
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskService.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.springframework.cloud.dataflow.server.repository.DeploymentKey;
4444
import org.springframework.cloud.dataflow.server.repository.NoSuchTaskDefinitionException;
4545
import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository;
46+
import org.springframework.cloud.dataflow.server.service.TaskExecutionCreationService;
4647
import org.springframework.cloud.dataflow.server.service.TaskService;
4748
import org.springframework.cloud.dataflow.server.service.TaskValidationService;
4849
import org.springframework.cloud.dataflow.server.service.ValidationStatus;
@@ -112,6 +113,8 @@ public class DefaultTaskService implements TaskService {
112113

113114
private final TaskValidationService taskValidationService;
114115

116+
private final TaskExecutionCreationService taskExecutionCreationService;
117+
115118
protected final AuditRecordService auditRecordService;
116119

117120
private final ArgumentSanitizer argumentSanitizer = new ArgumentSanitizer();
@@ -145,19 +148,21 @@ public DefaultTaskService(DataSourceProperties dataSourceProperties,
145148
TaskConfigurationProperties taskConfigurationProperties, DeploymentIdRepository deploymentIdRepository,
146149
AuditRecordService auditRecordService,
147150
String dataflowServerUri, CommonApplicationProperties commonApplicationProperties,
148-
TaskValidationService taskValidationService) {
149-
Assert.notNull(dataSourceProperties, "DataSourceProperties must not be null");
150-
Assert.notNull(taskDefinitionRepository, "TaskDefinitionRepository must not be null");
151-
Assert.notNull(taskExecutionRepository, "TaskExecutionRepository must not be null");
152-
Assert.notNull(taskExplorer, "TaskExplorer must not be null");
153-
Assert.notNull(registry, "UriRegistry must not be null");
154-
Assert.notNull(taskLauncher, "TaskLauncher must not be null");
151+
TaskValidationService taskValidationService,
152+
TaskExecutionCreationService taskExecutionCreationService) {
153+
Assert.notNull(dataSourceProperties, "dataSourceProperties must not be null");
154+
Assert.notNull(taskDefinitionRepository, "taskDefinitionRepository must not be null");
155+
Assert.notNull(taskExecutionRepository, "taskExecutionRepository must not be null");
156+
Assert.notNull(taskExplorer, "taskExplorer must not be null");
157+
Assert.notNull(registry, "uriRegistry must not be null");
158+
Assert.notNull(taskLauncher, "taskLauncher must not be null");
155159
Assert.notNull(metaDataResolver, "metaDataResolver must not be null");
156160
Assert.notNull(taskConfigurationProperties, "taskConfigurationProperties must not be null");
157161
Assert.notNull(deploymentIdRepository, "deploymentIdRepository must not be null");
158162
Assert.notNull(commonApplicationProperties, "commonApplicationProperties must not be null");
159163
Assert.notNull(auditRecordService, "auditRecordService must not be null");
160-
Assert.notNull(taskValidationService, "TaskValidationService must not be null");
164+
Assert.notNull(taskValidationService, "taskValidationService must not be null");
165+
Assert.notNull(taskExecutionCreationService, "taskExecutionRepositoryService must not be null");
161166
this.dataSourceProperties = dataSourceProperties;
162167
this.taskDefinitionRepository = taskDefinitionRepository;
163168
this.taskExecutionRepository = taskExecutionRepository;
@@ -171,6 +176,7 @@ public DefaultTaskService(DataSourceProperties dataSourceProperties,
171176
this.commonApplicationProperties = commonApplicationProperties;
172177
this.auditRecordService = auditRecordService;
173178
this.taskValidationService = taskValidationService;
179+
this.taskExecutionCreationService = taskExecutionCreationService;
174180
}
175181

176182
@Override
@@ -207,7 +213,7 @@ public long executeTask(String taskName, Map<String, String> taskDeploymentPrope
207213
Resource appResource = this.registry.getAppResource(appRegistration);
208214
Resource metadataResource = this.registry.getAppMetadataResource(appRegistration);
209215

210-
TaskExecution taskExecution = taskExecutionRepository.createTaskExecution(taskName);
216+
TaskExecution taskExecution = this.taskExecutionCreationService.createTaskExecution(taskName);
211217
taskDefinition = TaskServiceUtils.updateTaskProperties(taskDefinition,
212218
dataSourceProperties);
213219

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,11 @@
5252
import org.springframework.cloud.dataflow.server.repository.InMemoryDeploymentIdRepository;
5353
import org.springframework.cloud.dataflow.server.repository.InMemoryTaskDefinitionRepository;
5454
import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository;
55+
import org.springframework.cloud.dataflow.server.service.TaskExecutionCreationService;
5556
import org.springframework.cloud.dataflow.server.service.TaskJobService;
5657
import org.springframework.cloud.dataflow.server.service.TaskService;
5758
import org.springframework.cloud.dataflow.server.service.TaskValidationService;
59+
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskExecutionCreationService;
5860
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskJobService;
5961
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskService;
6062
import org.springframework.cloud.dataflow.server.service.impl.TaskConfigurationProperties;
@@ -169,14 +171,21 @@ public TaskDefinitionRepository taskDefinitionRepository() {
169171
return new InMemoryTaskDefinitionRepository();
170172
}
171173

174+
@Bean
175+
public TaskExecutionCreationService taskExecutionRepositoryService(TaskRepository taskRepository) {
176+
return new DefaultTaskExecutionCreationService(taskRepository);
177+
}
178+
172179
@Bean
173180
public TaskService taskService(TaskDefinitionRepository repository, TaskExplorer explorer, AppRegistry registry,
174181
TaskLauncher taskLauncher, ApplicationConfigurationMetadataResolver metadataResolver,
175182
DeploymentIdRepository deploymentIdRepository, AuditRecordService auditRecordService,
176-
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService) {
183+
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService,
184+
TaskExecutionCreationService taskExecutionCreationService) {
177185
return new DefaultTaskService(new DataSourceProperties(), repository, explorer, taskRepository(), registry,
178186
taskLauncher, metadataResolver, new TaskConfigurationProperties(), deploymentIdRepository,
179-
auditRecordService, null, commonApplicationProperties, taskValidationService);
187+
auditRecordService, null, commonApplicationProperties, taskValidationService,
188+
taskExecutionCreationService);
180189
}
181190

182191
@Bean

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TaskServiceDependencies.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import org.springframework.cloud.dataflow.server.repository.support.DataflowRdbmsInitializer;
3939
import org.springframework.cloud.dataflow.server.service.SchedulerService;
4040
import org.springframework.cloud.dataflow.server.service.SchedulerServiceProperties;
41+
import org.springframework.cloud.dataflow.server.service.TaskExecutionCreationService;
4142
import org.springframework.cloud.dataflow.server.service.TaskValidationService;
4243
import org.springframework.cloud.dataflow.server.service.impl.DefaultSchedulerService;
44+
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskExecutionCreationService;
4345
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskService;
4446
import org.springframework.cloud.dataflow.server.service.impl.TaskConfigurationProperties;
4547
import org.springframework.cloud.dataflow.server.service.impl.validation.DefaultTaskValidationService;
@@ -173,16 +175,22 @@ public DataSourceTransactionManager transactionManager(DataSource dataSource) {
173175
return new DataSourceTransactionManager(dataSource);
174176
}
175177

178+
@Bean
179+
public TaskExecutionCreationService taskExecutionRepositoryService(TaskRepository taskRepository) {
180+
return new DefaultTaskExecutionCreationService(taskRepository);
181+
}
182+
176183
@Bean
177184
public DefaultTaskService defaultTaskService(TaskDefinitionRepository taskDefinitionRepository,
178185
TaskExplorer taskExplorer, TaskRepository taskExecutionRepository, AppRegistry appRegistry,
179186
TaskLauncher taskLauncher, ApplicationConfigurationMetadataResolver metadataResolver,
180187
TaskConfigurationProperties taskConfigurationProperties, AuditRecordService auditRecordService,
181-
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService) {
188+
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService,
189+
TaskExecutionCreationService taskExecutionCreationService) {
182190
return new DefaultTaskService(this.dataSourceProperties, taskDefinitionRepository, taskExplorer,
183191
taskExecutionRepository, appRegistry, taskLauncher, metadataResolver, taskConfigurationProperties,
184192
new InMemoryDeploymentIdRepository(), auditRecordService, null, commonApplicationProperties,
185-
taskValidationService);
193+
taskValidationService, taskExecutionCreationService);
186194
}
187195

188196
@Bean

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,14 @@
9090
import org.springframework.cloud.dataflow.server.service.SkipperStreamService;
9191
import org.springframework.cloud.dataflow.server.service.StreamService;
9292
import org.springframework.cloud.dataflow.server.service.StreamValidationService;
93+
import org.springframework.cloud.dataflow.server.service.TaskExecutionCreationService;
9394
import org.springframework.cloud.dataflow.server.service.TaskService;
9495
import org.springframework.cloud.dataflow.server.service.TaskValidationService;
9596
import org.springframework.cloud.dataflow.server.service.impl.AppDeployerStreamService;
9697
import org.springframework.cloud.dataflow.server.service.impl.AppDeploymentRequestCreator;
9798
import org.springframework.cloud.dataflow.server.service.impl.DefaultSchedulerService;
9899
import org.springframework.cloud.dataflow.server.service.impl.DefaultSkipperStreamService;
100+
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskExecutionCreationService;
99101
import org.springframework.cloud.dataflow.server.service.impl.DefaultTaskService;
100102
import org.springframework.cloud.dataflow.server.service.impl.TaskConfigurationProperties;
101103
import org.springframework.cloud.dataflow.server.service.impl.validation.DefaultStreamValidationService;
@@ -419,21 +421,24 @@ public List<ApplicationsMetrics> getMetrics() {
419421
public TaskDefinitionController taskDefinitionController(TaskExplorer explorer, TaskDefinitionRepository repository,
420422
DeploymentIdRepository deploymentIdRepository, ApplicationConfigurationMetadataResolver metadataResolver,
421423
AppRegistryCommon appRegistry, AuditRecordService auditRecordService,
422-
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService) {
424+
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService,
425+
TaskExecutionCreationService taskExecutionCreationService) {
423426
return new TaskDefinitionController(explorer, repository,
424427
taskService(metadataResolver, taskRepository(), deploymentIdRepository, appRegistry,
425428
/* delegatingResourceLoader, */auditRecordService, commonApplicationProperties,
426-
taskValidationService));
429+
taskValidationService, taskExecutionCreationService));
427430
}
428431

429432
@Bean
430433
public TaskExecutionController taskExecutionController(TaskExplorer explorer,
431434
ApplicationConfigurationMetadataResolver metadataResolver, DeploymentIdRepository deploymentIdRepository,
432435
AppRegistryCommon appRegistry, AuditRecordService auditRecordService,
433-
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService) {
436+
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService,
437+
TaskExecutionCreationService taskExecutionCreationService) {
434438
return new TaskExecutionController(
435439
explorer, taskService(metadataResolver, taskRepository(), deploymentIdRepository, appRegistry,
436-
auditRecordService, commonApplicationProperties, taskValidationService),
440+
auditRecordService, commonApplicationProperties,
441+
taskValidationService, taskExecutionCreationService),
437442
taskDefinitionRepository());
438443
}
439444

@@ -485,15 +490,21 @@ public TaskExplorer taskExplorer() {
485490
return mock(TaskExplorer.class);
486491
}
487492

493+
@Bean
494+
public TaskExecutionCreationService taskExecutionRepositoryService(TaskRepository taskRepository) {
495+
return new DefaultTaskExecutionCreationService(taskRepository);
496+
}
497+
488498
@Bean
489499
public TaskService taskService(ApplicationConfigurationMetadataResolver metadataResolver,
490500
TaskRepository taskExecutionRepository, DeploymentIdRepository deploymentIdRepository,
491501
AppRegistryCommon appRegistry, AuditRecordService auditRecordService,
492-
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService) {
502+
CommonApplicationProperties commonApplicationProperties, TaskValidationService taskValidationService,
503+
TaskExecutionCreationService taskExecutionCreationService) {
493504
return new DefaultTaskService(new DataSourceProperties(), taskDefinitionRepository(), taskExplorer(),
494505
taskExecutionRepository, appRegistry, taskLauncher(), metadataResolver,
495506
new TaskConfigurationProperties(), deploymentIdRepository, auditRecordService, null,
496-
commonApplicationProperties, taskValidationService);
507+
commonApplicationProperties, taskValidationService, taskExecutionCreationService);
497508
}
498509

499510
@Bean

0 commit comments

Comments
 (0)