Skip to content

Commit f245593

Browse files
committed
Add synchronized decorators for non-stream item readers/writers
Resolves #4368
1 parent 14b6c66 commit f245593

File tree

8 files changed

+474
-0
lines changed

8 files changed

+474
-0
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import java.util.concurrent.locks.Lock;
19+
import java.util.concurrent.locks.ReentrantLock;
20+
21+
import org.springframework.batch.item.ItemReader;
22+
import org.springframework.lang.Nullable;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* This is an {@link ItemReader} decorator with a synchronized {@link ItemReader#read}
27+
* method. This decorator is useful when using a non thread-safe item reader in a
28+
* multi-threaded step.
29+
*
30+
* @author Mahmoud Ben Hassine
31+
* @since 5.1.0
32+
* @param <T> type of objects to read
33+
*/
34+
public class SynchronizedItemReader<T> implements ItemReader<T> {
35+
36+
private final ItemReader<T> delegate;
37+
38+
private final Lock lock = new ReentrantLock();
39+
40+
public SynchronizedItemReader(ItemReader<T> delegate) {
41+
Assert.notNull(delegate, "The delegate must not be null");
42+
this.delegate = delegate;
43+
}
44+
45+
/**
46+
* This method delegates to the {@code read} method of the delegate and is
47+
* synchronized with a lock.
48+
*/
49+
@Nullable
50+
public T read() throws Exception {
51+
this.lock.lock();
52+
try {
53+
return this.delegate.read();
54+
}
55+
finally {
56+
this.lock.unlock();
57+
}
58+
}
59+
60+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import java.util.concurrent.locks.Lock;
19+
import java.util.concurrent.locks.ReentrantLock;
20+
21+
import org.springframework.batch.item.Chunk;
22+
import org.springframework.batch.item.ItemWriter;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* This is an {@link ItemWriter} decorator with a synchronized {@link ItemWriter#write}
27+
* method. This decorator is useful when using a non thread-safe item writer in a
28+
* multi-threaded step.
29+
*
30+
* @author Mahmoud Ben Hassine
31+
* @since 5.1.0
32+
* @param <T> type of objects to write
33+
*/
34+
public class SynchronizedItemWriter<T> implements ItemWriter<T> {
35+
36+
private final ItemWriter<T> delegate;
37+
38+
private final Lock lock = new ReentrantLock();
39+
40+
public SynchronizedItemWriter(ItemWriter<T> delegate) {
41+
Assert.notNull(delegate, "The delegate must not be null");
42+
this.delegate = delegate;
43+
}
44+
45+
/**
46+
* This method delegates to the {@code write} method of the delegate and is
47+
* synchronized with a lock.
48+
*/
49+
@Override
50+
public void write(Chunk<? extends T> items) throws Exception {
51+
this.lock.lock();
52+
try {
53+
this.delegate.write(items);
54+
}
55+
finally {
56+
this.lock.unlock();
57+
}
58+
}
59+
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.item.support.builder;
18+
19+
import org.springframework.batch.item.ItemReader;
20+
import org.springframework.batch.item.support.SynchronizedItemReader;
21+
import org.springframework.util.Assert;
22+
23+
/**
24+
* Builder for {@link SynchronizedItemReader}.
25+
*
26+
* @author Mahmoud Ben Hassine
27+
* @since 5.1.0
28+
*/
29+
public class SynchronizedItemReaderBuilder<T> {
30+
31+
private ItemReader<T> delegate;
32+
33+
/**
34+
* The item reader to use as a delegate.
35+
* @param delegate the delegate reader to set
36+
* @return this instance for method chaining
37+
*/
38+
public SynchronizedItemReaderBuilder<T> delegate(ItemReader<T> delegate) {
39+
this.delegate = delegate;
40+
41+
return this;
42+
}
43+
44+
/**
45+
* Returns a new {@link SynchronizedItemReader}.
46+
* @return a new {@link SynchronizedItemReader}
47+
*/
48+
public SynchronizedItemReader<T> build() {
49+
Assert.notNull(this.delegate, "A delegate is required");
50+
51+
return new SynchronizedItemReader<>(this.delegate);
52+
}
53+
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.item.support.builder;
18+
19+
import org.springframework.batch.item.ItemWriter;
20+
import org.springframework.batch.item.support.SynchronizedItemWriter;
21+
import org.springframework.util.Assert;
22+
23+
/**
24+
* Builder for {@link SynchronizedItemWriter}.
25+
*
26+
* @author Mahmoud Ben Hassine
27+
* @since 5.1.0
28+
*/
29+
public class SynchronizedItemWriterBuilder<T> {
30+
31+
private ItemWriter<T> delegate;
32+
33+
/**
34+
* The item writer to use as a delegate.
35+
* @param delegate the delegate writer to set
36+
* @return this instance for method chaining
37+
*/
38+
public SynchronizedItemWriterBuilder<T> delegate(ItemWriter<T> delegate) {
39+
this.delegate = delegate;
40+
41+
return this;
42+
}
43+
44+
/**
45+
* Returns a new {@link SynchronizedItemWriter}.
46+
* @return a new {@link SynchronizedItemWriter}
47+
*/
48+
public SynchronizedItemWriter<T> build() {
49+
Assert.notNull(this.delegate, "A delegate is required");
50+
51+
return new SynchronizedItemWriter<>(this.delegate);
52+
}
53+
54+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import org.junit.jupiter.api.Assertions;
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
21+
import org.mockito.Mock;
22+
import org.mockito.junit.jupiter.MockitoExtension;
23+
24+
import org.springframework.batch.item.ItemReader;
25+
26+
import static org.mockito.Mockito.verify;
27+
28+
/**
29+
* Test class for {@link SynchronizedItemReader}.
30+
*
31+
* @author Mahmoud Ben Hassine
32+
*/
33+
@ExtendWith(MockitoExtension.class)
34+
public class SynchronizedItemReaderTests {
35+
36+
@Mock
37+
private ItemReader<Object> delegate;
38+
39+
@Test
40+
void testDelegateReadIsCalled() throws Exception {
41+
// given
42+
SynchronizedItemReader<Object> synchronizedItemReader = new SynchronizedItemReader<>(this.delegate);
43+
44+
// when
45+
synchronizedItemReader.read();
46+
47+
// then
48+
verify(this.delegate).read();
49+
}
50+
51+
@Test
52+
void testNullDelegate() {
53+
// when
54+
IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class,
55+
() -> new SynchronizedItemReader<>(null));
56+
57+
// then
58+
Assertions.assertEquals("The delegate must not be null", exception.getMessage());
59+
}
60+
61+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import org.junit.jupiter.api.Assertions;
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
21+
import org.mockito.Mock;
22+
import org.mockito.junit.jupiter.MockitoExtension;
23+
24+
import org.springframework.batch.item.Chunk;
25+
import org.springframework.batch.item.ItemWriter;
26+
27+
import static org.mockito.Mockito.verify;
28+
29+
/**
30+
* Test class for {@link SynchronizedItemWriter}.
31+
*
32+
* @author Mahmoud Ben Hassine
33+
*/
34+
@ExtendWith(MockitoExtension.class)
35+
public class SynchronizedItemWriterTests {
36+
37+
@Mock
38+
private ItemWriter<Object> delegate;
39+
40+
@Test
41+
void testDelegateWriteIsCalled() throws Exception {
42+
// given
43+
Chunk<Object> chunk = new Chunk<>();
44+
SynchronizedItemWriter<Object> synchronizedItemWriter = new SynchronizedItemWriter<>(this.delegate);
45+
46+
// when
47+
synchronizedItemWriter.write(chunk);
48+
49+
// then
50+
verify(this.delegate).write(chunk);
51+
}
52+
53+
@Test
54+
void testNullDelegate() {
55+
// when
56+
IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class,
57+
() -> new SynchronizedItemWriter<>(null));
58+
59+
// then
60+
Assertions.assertEquals("The delegate must not be null", exception.getMessage());
61+
}
62+
63+
}

0 commit comments

Comments
 (0)