2626import java .nio .file .Paths ;
2727import java .util .HashMap ;
2828import java .util .Map ;
29+ import java .util .regex .Pattern ;
2930
3031import org .apache .kafka .streams .StreamsBuilder ;
3132import org .apache .kafka .streams .StreamsConfig ;
33+ import org .apache .kafka .streams .kstream .KStream ;
34+ import org .apache .kafka .streams .kstream .KTable ;
35+ import org .apache .kafka .streams .kstream .Materialized ;
3236import org .junit .jupiter .api .BeforeAll ;
3337import org .junit .jupiter .api .Test ;
3438import org .junit .jupiter .api .condition .DisabledOnOs ;
@@ -76,7 +80,6 @@ public static void setup() throws IOException {
7680
7781 @ Test
7882 public void testCleanupStreams () throws IOException {
79- Files .createDirectory (Paths .get (stateStoreDir .toString (), APPLICATION_ID ));
8083 Path stateStore = Files .createDirectory (Paths .get (stateStoreDir .toString (), APPLICATION_ID , "0_0" ));
8184 assertThat (stateStore ).exists ();
8285 streamsBuilderFactoryBean .stop ();
@@ -97,6 +100,8 @@ protected StreamsBuilder createInstance() {
97100 }
98101 };
99102 streamsBuilderFactoryBean .afterPropertiesSet ();
103+ StreamsBuilder builder = streamsBuilderFactoryBean .getObject ();
104+ builder .stream (Pattern .compile ("foo" ));
100105 streamsBuilderFactoryBean .start ();
101106 StreamsBuilder streamsBuilder = streamsBuilderFactoryBean .getObject ();
102107 verify (streamsBuilder ).build (kafkaStreamsConfiguration .asProperties ());
@@ -125,6 +130,13 @@ public KafkaStreamsConfiguration kStreamsConfigs() {
125130 return new KafkaStreamsConfiguration (props );
126131 }
127132
133+ @ Bean
134+ public KTable <?, ?> table (StreamsBuilder builder ) {
135+ KStream <Object , Object > stream = builder .stream (Pattern .compile ("foo" ));
136+ return stream .groupByKey ()
137+ .count (Materialized .as ("store" ));
138+
139+ }
128140 }
129141
130142}
0 commit comments