1616
1717package org .springframework .kafka .test ;
1818
19- import static org .assertj .core .api .Assertions .assertThat ;
20-
19+ import java .io .IOException ;
20+ import java .io .UncheckedIOException ;
21+ import java .nio .file .Files ;
2122import java .time .Duration ;
2223import java .util .ArrayList ;
2324import java .util .Arrays ;
2930import java .util .Map ;
3031import java .util .Properties ;
3132import java .util .Set ;
33+ import java .util .UUID ;
3234import java .util .concurrent .TimeUnit ;
3335import java .util .concurrent .atomic .AtomicBoolean ;
3436import java .util .stream .Collectors ;
@@ -212,6 +214,7 @@ public void afterPropertiesSet() {
212214 this .zookeeperClient = new ZkClient (this .zkConnect , zkSessionTimeout , zkConnectionTimeout ,
213215 ZKStringSerializer$ .MODULE$ );
214216 this .kafkaServers .clear ();
217+ boolean userLogDir = this .brokerProperties .get (KafkaConfig .LogDirProp ()) != null && this .count == 1 ;
215218 for (int i = 0 ; i < this .count ; i ++) {
216219 Properties brokerConfigProperties = createBrokerProperties (i );
217220 brokerConfigProperties .setProperty (KafkaConfig .ReplicaSocketTimeoutMsProp (), "1000" );
@@ -222,6 +225,9 @@ public void afterPropertiesSet() {
222225 if (this .brokerProperties != null ) {
223226 this .brokerProperties .forEach (brokerConfigProperties ::put );
224227 }
228+ if (!userLogDir ) {
229+ logDir (brokerConfigProperties );
230+ }
225231 KafkaServer server = TestUtils .createServer (new KafkaConfig (brokerConfigProperties ), Time .SYSTEM );
226232 this .kafkaServers .add (server );
227233 if (this .kafkaPorts [i ] == 0 ) {
@@ -237,6 +243,16 @@ public void afterPropertiesSet() {
237243 System .setProperty (SPRING_EMBEDDED_ZOOKEEPER_CONNECT , getZookeeperConnectionString ());
238244 }
239245
246+ private void logDir (Properties brokerConfigProperties ) {
247+ try {
248+ brokerConfigProperties .put (KafkaConfig .LogDirProp (),
249+ Files .createTempDirectory ("spring.kafka." + UUID .randomUUID ()).toString ());
250+ }
251+ catch (IOException e ) {
252+ throw new UncheckedIOException (e );
253+ }
254+ }
255+
240256 private void overrideExitMethods () {
241257 String exitMsg = "Exit.%s(%d, %s) called" ;
242258 Exit .setExitProcedure ((statusCode , message ) -> {
@@ -478,11 +494,12 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
478494 * @param topics the topics.
479495 */
480496 public void consumeFromEmbeddedTopics (Consumer <?, ?> consumer , String ... topics ) {
481- HashSet <String > diff = new HashSet <>(Arrays .asList (topics ));
482- diff .removeAll (new HashSet <>(this .topics ));
483- assertThat (this .topics )
484- .as ("topic(s):'" + diff + "' are not in embedded topic list" )
485- .containsAll (new HashSet <>(Arrays .asList (topics )));
497+ List <String > notEmbedded = Arrays .stream (topics )
498+ .filter (topic -> !this .topics .contains (topic ))
499+ .collect (Collectors .toList ());
500+ if (notEmbedded .size () > 0 ) {
501+ throw new IllegalStateException ("topic(s):'" + notEmbedded + "' are not in embedded topic list" );
502+ }
486503 final AtomicBoolean assigned = new AtomicBoolean ();
487504 consumer .subscribe (Arrays .asList (topics ), new ConsumerRebalanceListener () {
488505
@@ -516,9 +533,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
516533 }
517534 consumer .seekToBeginning (records .partitions ());
518535 }
519- assertThat ( assigned .get ())
520- . as ("Failed to be assigned partitions from the embedded topics" )
521- . isTrue ();
536+ if (! assigned .get ()) {
537+ throw new IllegalStateException ("Failed to be assigned partitions from the embedded topics" );
538+ }
522539 logger .debug ("Subscription Initiated" );
523540 }
524541
0 commit comments