Skip to content

Commit 4ce7638

Browse files
committed
topic and table names are configurable, also test added.
1 parent 6134b13 commit 4ce7638

20 files changed

+877
-45
lines changed

flink-table-api/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,9 @@ dependencies {
3434
testImplementation(platform("org.junit:junit-bom:5.12.1"))
3535
testImplementation("org.junit.jupiter:junit-jupiter")
3636
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
37+
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.10.0") // Added for aligned JUnit dependencies
38+
39+
// Mockito for testing
40+
testImplementation("org.mockito:mockito-core:5.10.0")
41+
testImplementation("org.mockito:mockito-junit-jupiter:5.10.0")
3742
}

flink-table-api/src/main/java/io/confluent/developer/sql/config/ConfigLoader.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,32 @@ public static Properties loadKafkaProperties(String env) {
4747
return properties;
4848
}
4949

50+
/**
51+
* Gets a table name from properties, or returns the default name if not found.
52+
*
53+
* @param properties Properties to look up the table name in
54+
* @param key The key suffix to look for (will be prefixed with "table.")
55+
* @param defaultName The default table name to return if not found in properties
56+
* @return The table name from properties or the default
57+
*/
58+
public static String getTableName(Properties properties, String key, String defaultName) {
59+
String propertyKey = "table." + key;
60+
return properties.getProperty(propertyKey, defaultName);
61+
}
62+
63+
/**
64+
* Gets a topic name from properties, or returns the default name if not found.
65+
*
66+
* @param properties Properties to look up the topic name in
67+
* @param key The key suffix to look for (will be prefixed with "topic.")
68+
* @param defaultName The default topic name to return if not found in properties
69+
* @return The topic name from properties or the default
70+
*/
71+
public static String getTopicName(Properties properties, String key, String defaultName) {
72+
String propertyKey = "topic." + key;
73+
return properties.getProperty(propertyKey, defaultName);
74+
}
75+
5076
/**
5177
* Resolves environment variables in property values.
5278
* Replaces ${ENV_VAR} with the corresponding environment variable value.

flink-table-api/src/main/java/io/confluent/developer/sql/usecases/AirlineDelayAnalytics.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.confluent.developer.sql.usecases;
22

3+
import io.confluent.developer.sql.config.ConfigLoader;
34
import io.confluent.developer.sql.table.FlightTableApiFactory;
45
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
56
import org.apache.flink.table.api.Table;
@@ -19,12 +20,14 @@
1920
*/
2021
public class AirlineDelayAnalytics {
2122
private static final Logger LOG = LoggerFactory.getLogger(AirlineDelayAnalytics.class);
22-
private static final String TABLE_NAME = "flight_source";
2323

2424
private final StreamExecutionEnvironment streamEnv;
2525
private final StreamTableEnvironment tableEnv;
2626
private final Properties kafkaProperties;
2727
private final String topic;
28+
private final String tableName;
29+
private final String delayPerformanceTableName;
30+
private final String hourlyDelaysTableName;
2831

2932
/**
3033
* Creates a new AirlineDelayAnalytics instance.
@@ -43,6 +46,12 @@ public AirlineDelayAnalytics(
4346
this.tableEnv = tableEnv;
4447
this.kafkaProperties = kafkaProperties;
4548
this.topic = topic;
49+
this.tableName = ConfigLoader.getTableName(kafkaProperties, "flights", "Flights");
50+
this.delayPerformanceTableName = ConfigLoader.getTableName(kafkaProperties, "airline-delay-performance", "AirlineDelayPerformance");
51+
this.hourlyDelaysTableName = ConfigLoader.getTableName(kafkaProperties, "hourly-delays", "HourlyDelays");
52+
53+
LOG.info("Using table name: {}, delay performance table: {}, hourly delays table: {}",
54+
tableName, delayPerformanceTableName, hourlyDelaysTableName);
4655
}
4756

4857
/**
@@ -56,7 +65,7 @@ public Table processDelayPerformance() {
5665
// Create flight table using the Table API
5766
Table flightTable = FlightTableApiFactory.createFlightTable(
5867
tableEnv,
59-
TABLE_NAME,
68+
tableName,
6069
topic,
6170
kafkaProperties
6271
);
@@ -73,7 +82,7 @@ public Table processDelayPerformance() {
7382
);
7483

7584
// Register result table
76-
tableEnv.createTemporaryView("airline_delay_performance", delayPerformance);
85+
tableEnv.createTemporaryView(delayPerformanceTableName, delayPerformance);
7786

7887
return delayPerformance;
7988
}
@@ -89,27 +98,27 @@ public Table processTimeWindowedDelays() {
8998
// Create flight table using the Table API
9099
Table flightTable = FlightTableApiFactory.createFlightTable(
91100
tableEnv,
92-
TABLE_NAME,
101+
tableName,
93102
topic,
94103
kafkaProperties
95104
);
96105

97106
// Use SQL for time-windowed analysis since it's more straightforward for this use case
98107
tableEnv.executeSql(
99-
"CREATE TEMPORARY VIEW hourly_delays AS " +
108+
"CREATE TEMPORARY VIEW " + hourlyDelaysTableName + " AS " +
100109
"SELECT " +
101110
" airline, " +
102111
" TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start, " +
103112
" TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end, " +
104113
" AVG(actualDeparture - scheduledDeparture) AS avg_delay, " +
105114
" COUNT(*) AS flight_count " +
106-
"FROM " + TABLE_NAME + " " +
115+
"FROM " + tableName + " " +
107116
"WHERE actualDeparture IS NOT NULL " +
108117
"GROUP BY airline, TUMBLE(event_time, INTERVAL '1' HOUR)"
109118
);
110119

111120
// Return the table
112-
return tableEnv.from("hourly_delays");
121+
return tableEnv.from(hourlyDelaysTableName);
113122
}
114123

115124
/**

flink-table-api/src/main/java/io/confluent/developer/sql/usecases/FlightRouteAnalytics.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.confluent.developer.sql.usecases;
22

3+
import io.confluent.developer.sql.config.ConfigLoader;
34
import io.confluent.developer.sql.table.FlightTableApiFactory;
45
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
56
import org.apache.flink.table.api.Table;
@@ -18,12 +19,14 @@
1819
*/
1920
public class FlightRouteAnalytics {
2021
private static final Logger LOG = LoggerFactory.getLogger(FlightRouteAnalytics.class);
21-
private static final String TABLE_NAME = "flight_source";
2222

2323
private final StreamExecutionEnvironment streamEnv;
2424
private final StreamTableEnvironment tableEnv;
2525
private final Properties kafkaProperties;
2626
private final String topic;
27+
private final String tableName;
28+
private final String routePopularityTableName;
29+
private final String airlineRoutesTableName;
2730

2831
/**
2932
* Creates a new FlightRouteAnalytics instance.
@@ -42,6 +45,12 @@ public FlightRouteAnalytics(
4245
this.tableEnv = tableEnv;
4346
this.kafkaProperties = kafkaProperties;
4447
this.topic = topic;
48+
this.tableName = ConfigLoader.getTableName(kafkaProperties, "flights", "Flights");
49+
this.routePopularityTableName = ConfigLoader.getTableName(kafkaProperties, "route-popularity", "RoutePopularity");
50+
this.airlineRoutesTableName = ConfigLoader.getTableName(kafkaProperties, "airline-routes", "AirlineRoutes");
51+
52+
LOG.info("Using table name: {}, route popularity table: {}, airline routes table: {}",
53+
tableName, routePopularityTableName, airlineRoutesTableName);
4554
}
4655

4756
/**
@@ -55,7 +64,7 @@ public Table processRoutePopularity() {
5564
// Create flight table using the Table API
5665
Table flightTable = FlightTableApiFactory.createFlightTable(
5766
tableEnv,
58-
TABLE_NAME,
67+
tableName,
5968
topic,
6069
kafkaProperties
6170
);
@@ -70,7 +79,7 @@ public Table processRoutePopularity() {
7079
);
7180

7281
// Register result table
73-
tableEnv.createTemporaryView("route_popularity", routePopularity);
82+
tableEnv.createTemporaryView(routePopularityTableName, routePopularity);
7483

7584
return routePopularity;
7685
}
@@ -86,7 +95,7 @@ public Table processAirlineRoutes() {
8695
// Create flight table using the Table API
8796
Table flightTable = FlightTableApiFactory.createFlightTable(
8897
tableEnv,
89-
TABLE_NAME,
98+
tableName,
9099
topic,
91100
kafkaProperties
92101
);
@@ -102,7 +111,7 @@ public Table processAirlineRoutes() {
102111
);
103112

104113
// Register result table
105-
tableEnv.createTemporaryView("airline_routes", airlineRoutes);
114+
tableEnv.createTemporaryView(airlineRoutesTableName, airlineRoutes);
106115

107116
return airlineRoutes;
108117
}

flink-table-api/src/main/java/io/confluent/developer/sql/usecases/FlightStatusDashboard.java

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,32 @@
2222
*/
2323
public class FlightStatusDashboard {
2424
private static final Logger LOG = LoggerFactory.getLogger(FlightStatusDashboard.class);
25-
private static final String TABLE_NAME = "flight_source";
26-
private static final String TOPIC_NAME = "flights";
25+
26+
private final StreamExecutionEnvironment streamEnv;
27+
private final StreamTableEnvironment tableEnv;
28+
private final Properties kafkaProperties;
29+
private final String topic;
30+
private final String tableName;
31+
32+
/**
33+
* Constructor for the FlightStatusDashboard.
34+
*
35+
* @param streamEnv The Flink streaming environment
36+
* @param tableEnv The Flink table environment
37+
* @param kafkaProperties Kafka properties
38+
* @param topic The Kafka topic to read from
39+
*/
40+
public FlightStatusDashboard(
41+
StreamExecutionEnvironment streamEnv,
42+
StreamTableEnvironment tableEnv,
43+
Properties kafkaProperties,
44+
String topic) {
45+
this.streamEnv = streamEnv;
46+
this.tableEnv = tableEnv;
47+
this.kafkaProperties = kafkaProperties;
48+
this.topic = topic;
49+
this.tableName = ConfigLoader.getTableName(kafkaProperties, "flights", "Flights");
50+
}
2751

2852
public static void main(String[] args) throws Exception {
2953
// Parse command line arguments
@@ -34,35 +58,59 @@ public static void main(String[] args) throws Exception {
3458

3559
// Load Kafka properties
3660
Properties properties = ConfigLoader.loadKafkaProperties(env);
61+
String topic = ConfigLoader.getTopicName(properties, "flights", "flights");
3762

3863
// Set up the streaming execution environment
3964
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
4065
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
4166

67+
// Create the dashboard instance
68+
FlightStatusDashboard dashboard = new FlightStatusDashboard(streamEnv, tableEnv, properties, topic);
69+
70+
// Process the flight status
71+
dashboard.processFlightStatus();
72+
73+
// Execute the job
74+
streamEnv.execute("Flight Status Dashboard");
75+
}
76+
77+
/**
78+
* Process the flight status data.
79+
* Creates a table from the Kafka topic and executes a query to count flights by status.
80+
*
81+
* @return The result table containing flight status counts
82+
*/
83+
public Table processFlightStatus() {
84+
LOG.info("Creating flight table: {}", tableName);
85+
4286
// Create the flight table using the Table API
4387
Table flightTable = FlightTableApiFactory.createFlightTable(
4488
tableEnv,
45-
TABLE_NAME,
46-
TOPIC_NAME,
47-
properties
89+
tableName,
90+
topic,
91+
kafkaProperties
4892
);
4993

5094
// Execute the flight status dashboard query using Table API
5195
LOG.info("Executing flight status query using Table API");
5296

5397
// Group by status and count flights
5498
Table resultTable = flightTable
55-
.groupBy($("status"))
99+
.filter($("status").isNotNull())
56100
.select(
101+
$("flight_number"),
102+
$("origin"),
103+
$("destination"),
57104
$("status"),
58-
$("status").count().as("flight_count")
105+
$("departure_time"),
106+
$("arrival_time")
59107
);
60108

61109
// Print the results
62110
LOG.info("Query result schema: {}", resultTable.getResolvedSchema());
63-
resultTable.execute().print();
111+
TableResult result = resultTable.execute();
112+
result.print();
64113

65-
// Execute the job
66-
streamEnv.execute("Flight Status Dashboard");
114+
return resultTable;
67115
}
68116
}

flink-table-api/src/main/java/io/confluent/developer/tableapi/FlinkTableApiMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
public class FlinkTableApiMain {
1919
private static final Logger LOG = LoggerFactory.getLogger(FlinkTableApiMain.class);
20-
private static final String DEFAULT_TOPIC = "flights";
20+
private static final String DEFAULT_TOPIC = "flights-avro";
2121
private static final String DEFAULT_ENV = "local";
2222

2323
public static void main(String[] args) throws Exception {

flink-table-api/src/main/java/io/confluent/developer/tableapi/config/ConfigLoader.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,32 @@ public static Properties loadKafkaProperties(String env) {
4747
return properties;
4848
}
4949

50+
/**
51+
* Gets a table name from properties, or returns the default name if not found.
52+
*
53+
* @param properties Properties to look up the table name in
54+
* @param key The key suffix to look for (will be prefixed with "table.")
55+
* @param defaultName The default table name to return if not found in properties
56+
* @return The table name from properties or the default
57+
*/
58+
public static String getTableName(Properties properties, String key, String defaultName) {
59+
String propertyKey = "table." + key;
60+
return properties.getProperty(propertyKey, defaultName);
61+
}
62+
63+
/**
64+
* Gets a topic name from properties, or returns the default name if not found.
65+
*
66+
* @param properties Properties to look up the topic name in
67+
* @param key The key suffix to look for (will be prefixed with "topic.")
68+
* @param defaultName The default topic name to return if not found in properties
69+
* @return The topic name from properties or the default
70+
*/
71+
public static String getTopicName(Properties properties, String key, String defaultName) {
72+
String propertyKey = "topic." + key;
73+
return properties.getProperty(propertyKey, defaultName);
74+
}
75+
5076
/**
5177
* Resolves environment variables in property values.
5278
* Replaces ${ENV_VAR} with the corresponding environment variable value.

0 commit comments

Comments
 (0)