3232
3333@ Slf4j
3434class RedisEnterpriseSourceTaskIT extends AbstractTestcontainersRedisTestBase {
35-
35+
3636 @ Container
37- private static final RedisContainer REDIS = new RedisContainer ().withKeyspaceNotifications ();
37+ private static final RedisContainer REDIS = new RedisContainer (
38+ RedisContainer .DEFAULT_IMAGE_NAME .withTag (RedisContainer .DEFAULT_TAG )).withKeyspaceNotifications ();
3839
3940 @ Override
4041 protected Collection <RedisServer > servers () {
@@ -53,16 +54,16 @@ private void startTask(RedisTestContext redis, String... props) {
5354 config .put (RedisEnterpriseSourceConfig .REDIS_URI_CONFIG , redis .getServer ().getRedisURI ());
5455 task .start (config );
5556 }
56-
57- protected Map <String , String > map (String ... args ) {
58- Assert .notNull (args , "Args cannot be null" );
59- Assert .isTrue (args .length % 2 == 0 , "Args length is not a multiple of 2" );
60- Map <String , String > body = new LinkedHashMap <>();
61- for (int index = 0 ; index < args .length / 2 ; index ++) {
62- body .put (args [index * 2 ], args [index * 2 + 1 ]);
63- }
64- return body ;
65- }
57+
58+ protected Map <String , String > map (String ... args ) {
59+ Assert .notNull (args , "Args cannot be null" );
60+ Assert .isTrue (args .length % 2 == 0 , "Args length is not a multiple of 2" );
61+ Map <String , String > body = new LinkedHashMap <>();
62+ for (int index = 0 ; index < args .length / 2 ; index ++) {
63+ body .put (args [index * 2 ], args [index * 2 + 1 ]);
64+ }
65+ return body ;
66+ }
6667
6768 @ AfterEach
6869 public void teardown () {
@@ -74,9 +75,10 @@ public void teardown() {
7475 void pollStream (RedisTestContext redis ) throws InterruptedException {
7576 final String stream = "stream1" ;
7677 final String topicPrefix = "testprefix-" ;
77- startTask (redis , RedisEnterpriseSourceConfig .TOPIC_CONFIG , topicPrefix + RedisEnterpriseSourceConfig .TOKEN_STREAM ,
78- RedisEnterpriseSourceConfig .READER_CONFIG , RedisEnterpriseSourceConfig .ReaderType .STREAM .name (),
79- RedisEnterpriseSourceConfig .STREAM_NAME_CONFIG , stream );
78+ startTask (redis , RedisEnterpriseSourceConfig .TOPIC_CONFIG ,
79+ topicPrefix + RedisEnterpriseSourceConfig .TOKEN_STREAM , RedisEnterpriseSourceConfig .READER_CONFIG ,
80+ RedisEnterpriseSourceConfig .ReaderType .STREAM .name (), RedisEnterpriseSourceConfig .STREAM_NAME_CONFIG ,
81+ stream );
8082 String field1 = "field1" ;
8183 String value1 = "value1" ;
8284 String field2 = "field2" ;
@@ -107,8 +109,8 @@ private void assertEquals(String expectedId, Map<String, String> expectedBody, S
107109 void pollKeys (RedisTestContext redis ) throws InterruptedException {
108110 String topic = "mytopic" ;
109111 startTask (redis , RedisEnterpriseSourceConfig .READER_CONFIG , RedisEnterpriseSourceConfig .ReaderType .KEYS .name (),
110- RedisEnterpriseSourceConfig .STREAM_NAME_CONFIG , "dummy" , RedisEnterpriseSourceConfig .TOPIC_CONFIG , topic ,
111- RedisEnterpriseSourceTask .KEYS_IDLE_TIMEOUT , "3000" );
112+ RedisEnterpriseSourceConfig .STREAM_NAME_CONFIG , "dummy" , RedisEnterpriseSourceConfig .TOPIC_CONFIG ,
113+ topic , RedisEnterpriseSourceTask .KEYS_IDLE_TIMEOUT , "3000" );
112114 LiveRedisItemReader <String , DataStructure <String >> reader = ((KeySourceRecordReader ) task .getReader ())
113115 .getReader ();
114116 Awaitility .await ().until (reader ::isOpen );
0 commit comments