Skip to content

Commit 0870e40

Browse files
committed
refactoring, adding more tests
1 parent 0120518 commit 0870e40

22 files changed

+2896
-831
lines changed

Agenda.adoc

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
= Apache Flink Workshop - Full-Day Agenda
2+
3+
📅 *Time:* 9:00 AM – 5:00 PM
4+
*Breaks:* Coffee breaks (morning & afternoon) + 1-hour lunch
5+
🔧 *Format:* Mix of lectures, hands-on exercises, and interactive Q&A
6+
7+
== 9:00 AM – 9:30 AM | Welcome & Workshop Setup
8+
- Instructor introduction & agenda overview
9+
- Quick participant introductions (optional)
10+
- Verify setup: Docker, Flink, Confluent Cloud access
11+
- Troubleshooting any setup issues
12+
13+
== 9:30 AM – 10:30 AM | Introduction to Apache Flink & Stream Processing
14+
- Why real-time processing? Challenges with traditional systems
15+
- Apache Flink architecture overview
16+
- Key components: JobManager, TaskManager, Checkpoints
17+
- Understanding Flink’s execution model (DataStream vs. Table API)
18+
19+
💻 *Hands-on:* Running your first Flink job in Docker
20+
21+
☕ **10:30 AM – 10:45 AM | Morning Coffee Break**
22+
23+
== 10:45 AM – 12:15 PM | DataStream API: Core Concepts & Hands-on
24+
- Understanding event time, watermarks, and windowing
25+
- Stateless vs. stateful processing
26+
- Working with transformations (map, filter, flatMap, keyBy)
27+
- Managing state in Flink applications
28+
29+
💻 *Hands-on:*
30+
- Implementing a basic Flink job using the DataStream API
31+
- Experimenting with windowed aggregations
32+
33+
🥗 **12:15 PM – 1:15 PM | Lunch Break**
34+
35+
== 1:15 PM – 2:15 PM | Table API & SQL in Flink
36+
- Introduction to Flink SQL & Table API
37+
- Benefits of declarative stream processing
38+
- Converting between DataStream & Table API
39+
- Querying real-time data with Flink SQL
40+
41+
💻 *Hands-on:*
42+
- Running SQL queries on a streaming dataset
43+
- Creating a Flink SQL pipeline with Kafka as a source
44+
45+
☕ **2:15 PM – 2:30 PM | Afternoon Coffee Break**
46+
47+
== 2:30 PM – 3:30 PM | Connecting Flink with Kafka & Confluent Cloud
48+
- Why Flink + Kafka is a powerful combination
49+
- Configuring Flink to read/write from Confluent Cloud
50+
- Best practices for handling Kafka topics in Flink
51+
52+
💻 *Hands-on:*
53+
- Deploying a Flink job that reads from a Kafka topic
54+
- Writing processed results back to Kafka
55+
56+
== 3:30 PM – 4:15 PM | Testing & Debugging Flink Applications
57+
- Best practices for unit and integration testing in Flink
58+
- Using Flink's MiniCluster for local testing
59+
- Debugging & logging strategies
60+
61+
💻 *Hands-on:*
62+
- Writing tests for Flink jobs
63+
- Running a test-driven Flink pipeline
64+
65+
== 4:15 PM – 4:45 PM | Automating Flink Deployment with Terraform (Optional)
66+
- Infrastructure as Code for Flink & Kafka
67+
- Setting up Flink jobs using Terraform
68+
- Automating deployment & scaling
69+
70+
💻 *Hands-on:* Running Terraform scripts to deploy a Flink pipeline
71+
72+
== 4:45 PM – 5:00 PM | Wrap-up & Q&A
73+
- Recap of key concepts
74+
- Additional resources for deep diving into Flink
75+
- Open Q&A & feedback session

Makefile

Lines changed: 165 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,27 @@ CLOUD=☁️
2020
STAR=⭐
2121
TERRAFORM=🌐
2222

23+
# Common variables
24+
GRADLE=./gradlew
25+
FLINK_TABLE_API_MODULE=:flink-table-api:run
26+
DATA_GENERATOR_MODULE=:flink-data-generator:run
27+
REF_GENERATOR_MODULE=:data-generator:run
28+
29+
# Environment options
30+
ENV_LOCAL=local
31+
ENV_CLOUD=cloud
32+
33+
# Use case options
34+
USECASE_STATUS=status
35+
USECASE_ROUTES=routes
36+
USECASE_DELAYS=delays
37+
USECASE_ALL=all
38+
39+
# Function to generate Gradle run command with arguments
40+
define gradle_run
41+
$(GRADLE) $(1) --args="--useCase $(2) --env $(3)"
42+
endef
43+
2344
# Default target
2445
.PHONY: help
2546
help:
@@ -92,6 +113,13 @@ help:
92113
@echo "${BLUE}${INFO} ☁️ tf-upgrade${RESET} - Shorthand for terraform-upgrade"
93114
@echo "${BLUE}${INFO} ☁️ tf-org-id${RESET} - Shorthand for terraform-org-id"
94115
@echo ""
116+
@echo "${YELLOW}${STAR} Configuration Management:${RESET}"
117+
@echo "${BLUE}${INFO} 🔧 config-init${RESET} - Initialize configuration directories"
118+
@echo "${BLUE}${INFO} 🏠 config-local${RESET} - Generate local configuration files"
119+
@echo "${BLUE}${INFO} ☁️ config-cloud${RESET} - Generate cloud configuration files"
120+
@echo "${BLUE}${INFO} 📊 config-app-flink-table-api${RESET} - Generate Flink Table API application configuration"
121+
@echo "${BLUE}${INFO} 📋 config-list${RESET} - List all configuration files"
122+
@echo ""
95123
@echo "${YELLOW}${STAR} Cleanup:${RESET}"
96124
@echo "${BLUE}${INFO} 🧹 clean${RESET} - Clean up temporary files"
97125
@echo ""
@@ -317,8 +345,8 @@ terraform-org-id:
317345
terraform-output:
318346
@echo "${BLUE}${CLOUD} Generating cloud.properties from Terraform output...${RESET}"
319347
@mkdir -p ../common/utils/src/main/resources
320-
cd terraform && terraform output -json | jq -r 'to_entries | map( {key: .key|tostring|split("_")|join("."), value: .value} ) | map("\(.key)=\(.value.value)") | .[]' | while read -r line ; do echo "$$line"; done > ../common/utils/src/main/resources/cloud.properties
321-
@echo "${GREEN}${CHECK} cloud.properties generated in ../common/utils/src/main/resources/cloud.properties!${RESET}"
348+
cd terraform && terraform output -json | jq -r 'to_entries | map( {key: .key|tostring|split("_")|join("."), value: .value} ) | map("\(.key)=\(.value.value)") | .[]' | while read -r line ; do echo "$$line"; done > ../config/cloud/cloud.properties
349+
@echo "${GREEN}${CHECK} cloud.properties generated in ../config/cloud/cloud.properties!${RESET}"
322350

323351
# Shorthand commands for Terraform operations
324352
.PHONY: tf-init
@@ -389,53 +417,41 @@ run-sql:
389417
./gradlew :flink-table-api:run
390418

391419
# Flink Table API Module
392-
.PHONY: run-sql-status-local
393-
run-sql-status-local:
394-
@echo "${BLUE}${ROCKET} Running Flight Status Dashboard with Table API (local)...${RESET}"
395-
./gradlew :flink-table-api:run --args="status local"
396-
@echo "${GREEN}${CHECK} Flight Status Dashboard completed!${RESET}"
397420

398-
.PHONY: run-sql-routes-local
399-
run-sql-routes-local:
400-
@echo "${BLUE}${ROCKET} Running Flight Route Analytics with Table API (local)...${RESET}"
401-
./gradlew :flink-table-api:run --args="routes local"
402-
@echo "${GREEN}${CHECK} Flight Route Analytics completed!${RESET}"
421+
# Helper function to run SQL use cases
422+
define run_sql_usecase
423+
@echo "${BLUE}$(if $(filter $(ENV_CLOUD),$(3)),${CLOUD},${ROCKET}) Running $(1) with Table API ($(3))...${RESET}"
424+
$(call gradle_run,$(FLINK_TABLE_API_MODULE),$(2),$(3))
425+
@echo "${GREEN}${CHECK} $(1) completed!${RESET}"
426+
endef
403427

404-
.PHONY: run-sql-delays-local
405-
run-sql-delays-local:
406-
@echo "${BLUE}${ROCKET} Running Airline Delay Analytics with Table API (local)...${RESET}"
407-
./gradlew :flink-table-api:run --args="delays local"
408-
@echo "${GREEN}${CHECK} Airline Delay Analytics completed!${RESET}"
409-
410-
.PHONY: run-sql-all-local
411-
run-sql-all-local:
412-
@echo "${BLUE}${ROCKET} Running all SQL use cases with Table API (local)...${RESET}"
413-
./gradlew :flink-table-api:run --args="all local"
414-
@echo "${GREEN}${CHECK} All SQL use cases completed!${RESET}"
428+
.PHONY: run-sql-status-local run-sql-status-cloud
429+
run-sql-status-local:
430+
$(call run_sql_usecase,Flight Status Dashboard,$(USECASE_STATUS),$(ENV_LOCAL))
415431

416-
.PHONY: run-sql-status-cloud
417432
run-sql-status-cloud:
418-
@echo "${BLUE}${CLOUD} Running Flight Status Dashboard with Table API (cloud)...${RESET}"
419-
./gradlew :flink-table-api:run --args="status cloud"
420-
@echo "${GREEN}${CHECK} Flight Status Dashboard completed!${RESET}"
433+
$(call run_sql_usecase,Flight Status Dashboard,$(USECASE_STATUS),$(ENV_CLOUD))
434+
435+
.PHONY: run-sql-routes-local run-sql-routes-cloud
436+
run-sql-routes-local:
437+
$(call run_sql_usecase,Flight Route Analytics,$(USECASE_ROUTES),$(ENV_LOCAL))
421438

422-
.PHONY: run-sql-routes-cloud
423439
run-sql-routes-cloud:
424-
@echo "${BLUE}${CLOUD} Running Flight Route Analytics with Table API (cloud)...${RESET}"
425-
./gradlew :flink-table-api:run --args="routes cloud"
426-
@echo "${GREEN}${CHECK} Flight Route Analytics completed!${RESET}"
440+
$(call run_sql_usecase,Flight Route Analytics,$(USECASE_ROUTES),$(ENV_CLOUD))
441+
442+
.PHONY: run-sql-delays-local run-sql-delays-cloud
443+
run-sql-delays-local:
444+
$(call run_sql_usecase,Airline Delay Analytics,$(USECASE_DELAYS),$(ENV_LOCAL))
427445

428-
.PHONY: run-sql-delays-cloud
429446
run-sql-delays-cloud:
430-
@echo "${BLUE}${CLOUD} Running Airline Delay Analytics with Table API (cloud)...${RESET}"
431-
./gradlew :flink-table-api:run --args="delays cloud"
432-
@echo "${GREEN}${CHECK} Airline Delay Analytics completed!${RESET}"
447+
$(call run_sql_usecase,Airline Delay Analytics,$(USECASE_DELAYS),$(ENV_CLOUD))
448+
449+
.PHONY: run-sql-all-local run-sql-all-cloud
450+
run-sql-all-local:
451+
$(call run_sql_usecase,All SQL use cases,$(USECASE_ALL),$(ENV_LOCAL))
433452

434-
.PHONY: run-sql-all-cloud
435453
run-sql-all-cloud:
436-
@echo "${BLUE}${CLOUD} Running all SQL use cases with Table API (cloud)...${RESET}"
437-
./gradlew :flink-table-api:run --args="all cloud"
438-
@echo "${GREEN}${CHECK} All SQL use cases completed!${RESET}"
454+
$(call run_sql_usecase,All SQL use cases,$(USECASE_ALL),$(ENV_CLOUD))
439455

440456
# Docker Compose targets
441457
.PHONY: docker-up docker-down docker-ps docker-logs docker-restart
@@ -524,59 +540,137 @@ flink-sql-execute:
524540
fi
525541
docker compose run --rm -e SQL_FILE=$(SQL_FILE) sql-client
526542

543+
# Helper function to build and run generators
544+
define build_generator
545+
@echo "${BLUE}${ROCKET} Building $(1)...${RESET}"
546+
$(GRADLE) $(2):build
547+
@echo "${GREEN}${CHECK} $(1) built successfully!${RESET}"
548+
endef
549+
550+
define run_generator
551+
@echo "${BLUE}$(if $(filter $(ENV_CLOUD),$(3)),${CLOUD},${ROCKET}) Running $(1) in $(3) environment...${RESET}"
552+
$(GRADLE) $(2) --args="--env $(3)"
553+
@echo "${GREEN}${CHECK} $(1) completed!${RESET}"
554+
endef
555+
556+
define run_generator_with_props
557+
@echo "${BLUE}${ROCKET} Running $(1) with custom properties...${RESET}"
558+
@if [ -z "$(PROPS)" ]; then \
559+
echo "${RED}${ERROR} Please specify properties file with PROPS=path/to/properties.${RESET}"; \
560+
exit 1; \
561+
fi
562+
$(GRADLE) $(2) --args="--properties $(PROPS)"
563+
@echo "${GREEN}${CHECK} $(1) completed!${RESET}"
564+
endef
565+
527566
# Flink Data Generator targets
528567
.PHONY: build-data-generator run-data-generator-local run-data-generator-cloud run-data-generator-with-props
529568

530569
build-data-generator:
531-
@echo "${BLUE}${ROCKET} Building Flink Data Generator...${RESET}"
532-
./gradlew :flink-data-generator:build
533-
@echo "${GREEN}${CHECK} Flink Data Generator built successfully!${RESET}"
570+
$(call build_generator,Flink Data Generator,$(DATA_GENERATOR_MODULE))
534571

535572
run-data-generator-local:
536-
@echo "${BLUE}${ROCKET} Running Flink Data Generator in local environment...${RESET}"
537-
./gradlew :flink-data-generator:run --args="--env local"
538-
@echo "${GREEN}${CHECK} Flink Data Generator completed!${RESET}"
573+
$(call run_generator,Flink Data Generator,$(DATA_GENERATOR_MODULE),$(ENV_LOCAL))
539574

540575
run-data-generator-cloud:
541-
@echo "${BLUE}${CLOUD} Running Flink Data Generator in cloud environment...${RESET}"
542-
./gradlew :flink-data-generator:run --args="--env cloud"
543-
@echo "${GREEN}${CHECK} Flink Data Generator completed!${RESET}"
576+
$(call run_generator,Flink Data Generator,$(DATA_GENERATOR_MODULE),$(ENV_CLOUD))
544577

545578
run-data-generator-with-props:
546-
@echo "${BLUE}${ROCKET} Running Flink Data Generator with custom properties...${RESET}"
547-
@if [ -z "$(PROPS)" ]; then \
548-
echo "${RED}${ERROR} Please specify properties file with PROPS=path/to/properties.${RESET}"; \
549-
exit 1; \
550-
fi
551-
./gradlew :flink-data-generator:run --args="--properties $(PROPS)"
552-
@echo "${GREEN}${CHECK} Flink Data Generator completed!${RESET}"
579+
$(call run_generator_with_props,Flink Data Generator,$(DATA_GENERATOR_MODULE))
553580

554581
# Reference Data Generator targets
555582
.PHONY: build-ref-generator run-ref-generator-local run-ref-generator-cloud run-ref-generator-with-props
556583

557584
build-ref-generator:
558-
@echo "${BLUE}${ROCKET} Building Reference Data Generator...${RESET}"
559-
./gradlew :data-generator:build
560-
@echo "${GREEN}${CHECK} Reference Data Generator built successfully!${RESET}"
585+
$(call build_generator,Reference Data Generator,$(REF_GENERATOR_MODULE))
561586

562587
run-ref-generator-local:
563-
@echo "${BLUE}${ROCKET} Running Reference Data Generator in local environment...${RESET}"
564-
./gradlew :data-generator:run --args="--env local"
565-
@echo "${GREEN}${CHECK} Reference Data Generator completed!${RESET}"
588+
$(call run_generator,Reference Data Generator,$(REF_GENERATOR_MODULE),$(ENV_LOCAL))
566589

567590
run-ref-generator-cloud:
568-
@echo "${BLUE}${CLOUD} Running Reference Data Generator in cloud environment...${RESET}"
569-
./gradlew :data-generator:run --args="--env cloud"
570-
@echo "${GREEN}${CHECK} Reference Data Generator completed!${RESET}"
591+
$(call run_generator,Reference Data Generator,$(REF_GENERATOR_MODULE),$(ENV_CLOUD))
571592

572593
run-ref-generator-with-props:
573-
@echo "${BLUE}${ROCKET} Running Reference Data Generator with custom properties...${RESET}"
574-
@if [ -z "$(PROPS)" ]; then \
575-
echo "${RED}${ERROR} Please specify properties file with PROPS=path/to/properties.${RESET}"; \
576-
exit 1; \
577-
fi
578-
./gradlew :data-generator:run --args="--properties $(PROPS)"
579-
@echo "${GREEN}${CHECK} Reference Data Generator completed!${RESET}"
594+
$(call run_generator_with_props,Reference Data Generator,$(REF_GENERATOR_MODULE))
595+
596+
# Configuration Management
597+
598+
# Configuration directories
599+
CONFIG_DIR=config
600+
CONFIG_LOCAL_DIR=$(CONFIG_DIR)/local
601+
CONFIG_CLOUD_DIR=$(CONFIG_DIR)/cloud
602+
CONFIG_APP_DIR=$(CONFIG_DIR)/application
603+
604+
# Topic names
605+
TOPICS=flights-avro airlines airports weather flight-delays
606+
TOPIC_NAMES=$(foreach topic,$(TOPICS),topic.$(topic)=$(topic))
607+
608+
# Table names
609+
TABLES=Flights Airlines Airports AirlineDelayPerformance HourlyDelays RoutePopularity AirlineRoutes
610+
TABLE_NAMES=$(foreach table,$(TABLES),table.$(shell echo $(table) | sed 's/\([A-Z]\)/-\1/g' | sed 's/^-//' | tr '[:upper:]' '[:lower:]')=$(table))
611+
612+
# Helper function to create configuration file
613+
define create_config_file
614+
@echo "# $(1)" > $(2)
615+
@for line in $(3); do \
616+
echo "$$line" >> $(2); \
617+
done
618+
@echo "" >> $(2)
619+
endef
620+
621+
.PHONY: config-init config-local config-cloud config-app-flink-table-api config-list
622+
623+
config-init:
624+
@echo "${BLUE}${INFO} Initializing configuration directories...${RESET}"
625+
@mkdir -p $(CONFIG_LOCAL_DIR)
626+
@mkdir -p $(CONFIG_CLOUD_DIR)
627+
@mkdir -p $(CONFIG_APP_DIR)
628+
@echo "${GREEN}${CHECK} Configuration directories created.${RESET}"
629+
630+
config-local: config-init
631+
@echo "${BLUE}${ROCKET} Generating local configuration files...${RESET}"
632+
$(call create_config_file,Local Kafka Configuration,$(CONFIG_LOCAL_DIR)/kafka.properties, \
633+
"bootstrap.servers=localhost:9092" \
634+
"schema.registry.url=http://localhost:8081" \
635+
"security.protocol=PLAINTEXT")
636+
637+
$(call create_config_file,Local Topic Names Configuration,$(CONFIG_LOCAL_DIR)/topics.properties,$(TOPIC_NAMES))
638+
$(call create_config_file,Local Table Names Configuration,$(CONFIG_LOCAL_DIR)/tables.properties,$(TABLE_NAMES))
639+
@echo "${GREEN}${CHECK} Local configuration files generated.${RESET}"
640+
641+
config-cloud: config-init
642+
@echo "${BLUE}${CLOUD} Generating cloud configuration files template...${RESET}"
643+
$(call create_config_file,Cloud Kafka Configuration,$(CONFIG_CLOUD_DIR)/kafka.properties, \
644+
"bootstrap.servers=\$${BOOTSTRAP_SERVERS}" \
645+
"schema.registry.url=\$${SCHEMA_REGISTRY_URL}" \
646+
"security.protocol=SASL_SSL" \
647+
"sasl.mechanism=PLAIN" \
648+
"sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='\$${API_KEY}' password='\$${API_SECRET}';" \
649+
"basic.auth.credentials.source=USER_INFO" \
650+
"basic.auth.user.info=\$${SR_API_KEY}:\$${SR_API_SECRET}" \
651+
"schema.registry.basic.auth.user.info=\$${SR_API_KEY}:\$${SR_API_SECRET}")
652+
653+
$(call create_config_file,Cloud Topic Names Configuration,$(CONFIG_CLOUD_DIR)/topics.properties,$(TOPIC_NAMES))
654+
$(call create_config_file,Cloud Table Names Configuration,$(CONFIG_CLOUD_DIR)/tables.properties,$(TABLE_NAMES))
655+
@echo "${GREEN}${CHECK} Cloud configuration files template generated.${RESET}"
656+
@echo "${YELLOW}${WARNING} Remember to replace environment variables in cloud/kafka.properties with actual values.${RESET}"
657+
658+
config-app-flink-table-api: config-init
659+
@echo "${BLUE}${INFO} Generating Flink Table API application configuration...${RESET}"
660+
$(call create_config_file,Flink Table API Application Configuration,$(CONFIG_APP_DIR)/flink-table-api.properties, \
661+
"app.name=Flink Table API" \
662+
"app.version=1.0.0" \
663+
"app.parallelism=2" \
664+
"app.checkpoint.interval=60000" \
665+
"app.state.backend=rocksdb" \
666+
"app.restart.strategy=fixed-delay" \
667+
"app.restart.attempts=3" \
668+
"app.restart.delay=10000")
669+
@echo "${GREEN}${CHECK} Flink Table API application configuration generated.${RESET}"
670+
671+
config-list:
672+
@echo "${BLUE}${INFO} Listing all configuration files:${RESET}"
673+
@find $(CONFIG_DIR) -type f | sort
580674

581675
# Clean up
582676
.PHONY: clean

docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
kafka:
3-
image: apache/kafka:3.9.0
3+
image: apache/kafka:4.0.0
44
hostname: kafka
55
ports:
66
- "9092:9092"

0 commit comments

Comments
 (0)