Skip to content

Commit 2b43490

Browse files
authored
Merge pull request #131 from RanabirChakraborty/AMW-162
AMW-162 Add support for KRaft
2 parents c8862f7 + 6ccde12 commit 2b43490

File tree

16 files changed

+422
-5
lines changed

16 files changed

+422
-5
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: CI
33
on:
44
push:
55
branches:
6-
- main
6+
- AMW-162
77
pull_request:
88
schedule:
99
- cron: '0 6 * * *'
@@ -16,4 +16,4 @@ jobs:
1616
fqcn: 'middleware_automation/amq_streams'
1717
root_permission_varname: 'amq_streams_install_requires_become'
1818
molecule_tests: >-
19-
[ "default" ]
19+
[ "default", "amq_streams_kraft" ]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
---
2+
- name: "Automate AMQ Streams KRaft install"
3+
hosts: all
4+
vars:
5+
# Topic Management
6+
amq_streams_broker_topics:
7+
- name: sampleTopic
8+
partitions: 2
9+
replication_factor: 1
10+
- name: otherTopic
11+
partitions: 4
12+
replication_factor: 1
13+
vars_files:
14+
- vars.yml
15+
roles:
16+
- role: amq_streams_common
17+
- role: amq_streams_kraft
18+
tasks:
19+
- name: "Ensure AMQ Streams Broker is running and available."
20+
ansible.builtin.include_role:
21+
name: amq_streams_broker
22+
vars:
23+
amq_streams_common_skip_download: true
24+
25+
- name: "Ensure AMQ Streams Connect is running and available."
26+
ansible.builtin.include_role:
27+
name: amq_streams_connect
28+
vars:
29+
connectors:
30+
- { name: "file", path: "connectors/file.yml" }
31+
32+
- name: "Validate that KRaft deployment is functional."
33+
ansible.builtin.include_role:
34+
name: amq_streams_kraft
35+
tasks_from: validate.yml
36+
37+
- name: "Validate that Broker deployment is functional."
38+
ansible.builtin.include_role:
39+
name: amq_streams_broker
40+
tasks_from: validate.yml
41+
42+
- name: "Validate that Connect deployment is functional."
43+
ansible.builtin.include_role:
44+
name: amq_streams_connect
45+
tasks_from: validate.yml
46+
47+
post_tasks:
48+
- name: "Ensures topics exist."
49+
ansible.builtin.include_role:
50+
name: amq_streams_broker
51+
tasks_from: topic/create.yml
52+
loop: "{{ amq_streams_broker_topics }}"
53+
loop_control:
54+
loop_var: topic
55+
vars:
56+
topic_name: "{{ topic.name }}"
57+
topic_partitions: "{{ topic.partitions }}"
58+
topic_replication_factor: "{{ topic.replication_factor }}"
59+
60+
- name: "Describe created topics."
61+
ansible.builtin.include_role:
62+
name: amq_streams_broker
63+
tasks_from: topic/describe.yml
64+
loop: "{{ amq_streams_broker_topics }}"
65+
loop_control:
66+
loop_var: topic
67+
vars:
68+
topic_name: "{{ topic.name }}"
69+
70+
- name: "Delete topics"
71+
ansible.builtin.include_role:
72+
name: amq_streams_broker
73+
tasks_from: topic/delete.yml
74+
loop: "{{ amq_streams_broker_topics }}"
75+
loop_control:
76+
loop_var: topic
77+
vars:
78+
topic_name: "{{ topic.name }}"
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
driver:
3+
name: docker
4+
platforms:
5+
- name: instance
6+
image: registry.access.redhat.com/ubi9/ubi-init:latest
7+
command: "/usr/sbin/init"
8+
pre_build_image: true
9+
privileged: true
10+
groups:
11+
- brokers
12+
provisioner:
13+
name: ansible
14+
config_options:
15+
defaults:
16+
interpreter_python: auto_silent
17+
ssh_connection:
18+
pipelining: false
19+
playbooks:
20+
prepare: ../prepare.yml
21+
converge: converge.yml
22+
verify: verify.yml
23+
inventory:
24+
host_vars:
25+
localhost:
26+
ansible_python_interpreter: "{{ ansible_playbook_python }}"
27+
env:
28+
ANSIBLE_FORCE_COLOR: "true"
29+
verifier:
30+
name: ansible
31+
scenario:
32+
test_sequence:
33+
- cleanup
34+
- destroy
35+
- syntax
36+
- create
37+
- prepare
38+
- converge
39+
- idempotence
40+
- side_effect
41+
- verify
42+
- cleanup
43+
- destroy

molecule/amq_streams_kraft/roles

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../roles
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
amq_streams_common_escalade_privilege_group_create: "{{ amq_streams_install_requires_become | default(true) }}"
3+
amq_streams_common_escalade_privilege_user_create: "{{ amq_streams_install_requires_become | default(true) }}"
4+
amq_streams_common_archive_extraction_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
5+
amq_streams_common_dependencies_require_priv: "{{ amq_streams_install_requires_become | default(true) }}"
6+
amq_streams_zookeeper_data_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
7+
amq_streams_zookeeper_restart_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
8+
amq_streams_broker_tls_truststore_client_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
9+
amq_streams_broker_config_files_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
10+
amq_streams_cruise_control_path_to_capacity_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
11+
amq_streams_cruise_control_logfiles_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
12+
amq_streams_connect_source_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
13+
amq_streams_kraft_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
14+
amq_streams_common_product_version: 4.1.1
15+
# Run the Systemd Service as root
16+
amq_streams_broker_user: root
17+
amq_streams_broker_group: root
18+
19+
# Run KRaft tasks as root
20+
amq_streams_kraft_user: root
21+
amq_streams_kraft_group: root
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
- name: Verify
3+
hosts: all
4+
tasks:
5+
6+
- name: Populate service facts
7+
ansible.builtin.service_facts:
8+
9+
- name: Check broker service
10+
assert:
11+
that:
12+
- ansible_facts.services["amq_streams_broker.service"]["state"] == "running"
13+
- ansible_facts.services["amq_streams_broker.service"]["status"] == "enabled"

roles/amq_streams_broker/templates/server.properties.j2

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,35 @@
2020
# See kafka.server.KafkaConfig for additional details and defaults
2121
#
2222

23+
# -----------------------------------------------------------------------------
24+
# MODE DETERMINATION (Added for Kafka 4.0+ KRaft Support)
25+
# -----------------------------------------------------------------------------
26+
{% set enable_kraft = amq_streams_enable_kraft | default(amq_streams_common_product_version is version('4.0.0', '>=')) | bool %}
27+
2328
############################# Server Basics #############################
2429

2530
# The id of the broker. This must be set to a unique integer for each broker.
31+
{% if enable_kraft %}
32+
# KRaft uses node.id instead of broker.id
33+
node.id={{ amq_streams_kraft_node_id | default(1) }}
34+
{% else %}
2635
broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory_group.index(inventory_hostname)) }}
36+
{% endif %}
37+
38+
############################# KRaft Settings (Kafka 4.0+) #############################
39+
{% if enable_kraft %}
40+
# The roles of this process. broker, controller, or both.
41+
process.roles={{ amq_streams_kraft_process_roles | default('broker,controller') }}
42+
43+
# The connect string for the controller quorum
44+
controller.quorum.voters={{ amq_streams_kraft_controller_quorum_voters }}
45+
46+
# Listener name used for the controller
47+
controller.listener.names={{ amq_streams_kraft_controller_listener_names | default('CONTROLLER') }}
48+
49+
# Listener name used for inter-broker communication
50+
inter.broker.listener.name={{ amq_streams_kraft_inter_broker_listener_name | default('PLAINTEXT') }}
51+
{% endif %}
2752

2853
############################# Socket Server Settings #############################
2954

@@ -33,27 +58,43 @@ broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory
3358
# listeners = listener_name://host_name:port
3459
# EXAMPLE:
3560
# listeners = PLAINTEXT://your.host.name:9092
61+
{% if enable_kraft %}
62+
# KRaft Mode Listeners (Requires Broker + Controller ports)
63+
listeners={{ amq_streams_kraft_listeners | join(",") }}
64+
{% else %}
65+
# Legacy ZK Mode Listeners
3666
{% if amq_streams_broker_listeners is defined %}
3767
listeners={{ amq_streams_broker_listeners | join(",") }}
3868
{% elif amq_streams_broker_listener_port is defined %}
3969
listeners=PLAINTEXT://:{{ amq_streams_broker_listener_port }}
4070
{% else %}
4171
#listeners=PLAINTEXT://:9092
4272
{% endif %}
73+
{% endif %}
4374

44-
{% if amq_streams_broker_inter_broker_listener is defined %}
45-
# Name of listener used for communication between brokers
75+
{% if amq_streams_broker_inter_broker_listener is defined and not enable_kraft %}
76+
# Name of listener used for communication between brokers (Legacy ZK only)
4677
inter.broker.listener.name={{ amq_streams_broker_inter_broker_listener }}
4778
{% endif %}
4879

4980
# Listener name, hostname and port the broker will advertise to clients.
5081
# If not set, it uses the value for "listeners".
82+
{% if enable_kraft %}
83+
# KRaft Mode Advertised Listeners (Broker port only)
84+
advertised.listeners={{ amq_streams_kraft_advertised_listeners | join(",") }}
85+
{% else %}
86+
# Legacy ZK Mode Advertised Listeners
5187
{% if amq_streams_broker_advertised_listeners is defined %}
5288
advertised.listeners={{ amq_streams_broker_advertised_listeners | join(",") }}
5389
{% else %}
5490
#advertised.listeners=PLAINTEXT://your.host.name:9092
5591
{% endif %}
92+
{% endif %}
5693

94+
{% if enable_kraft %}
95+
# KRaft Mode Security Map (Must include Controller)
96+
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
97+
{% else %}
5798
{% if amq_streams_broker_auth_enabled and amq_streams_broker_auth_listeners is defined %}
5899
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
59100
listener.security.protocol.map={{ amq_streams_broker_auth_listeners | join(",") }}
@@ -68,6 +109,7 @@ sasl.mechanism.inter.broker.protocol={{ amq_streams_broker_inter_broker_auth_sas
68109
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
69110
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
70111
{% endif %}
112+
{% endif %}
71113

72114
{% if amq_streams_broker_inter_broker_listener_auth is defined %}
73115
# Authentication mechanism for the inter-broker listener
@@ -105,7 +147,11 @@ socket.request.max.bytes={{ amq_streams_broker_socket_request_max_bytes }}
105147
############################# Log Basics #############################
106148

107149
# A comma separated list of directories under which to store log files
150+
{% if enable_kraft %}
151+
log.dirs={{ amq_streams_kraft_log_dirs }}
152+
{% else %}
108153
log.dirs={{ amq_streams_broker_data_dir }}
154+
{% endif %}
109155

110156
# The default number of log partitions per topic. More partitions allow greater
111157
# parallelism for consumption, but this will also result in more files across
@@ -162,7 +208,7 @@ log.retention.hours={{ amq_streams_broker_log_retention_hours }}
162208
log.retention.check.interval.ms={{ amq_streams_broker_log_retention_check_interval_ms }}
163209

164210
############################# Zookeeper #############################
165-
211+
{% if not enable_kraft %}
166212
# Zookeeper connection string (see zookeeper docs for details).
167213
# This is a comma separated host:port pairs, each corresponding to a zk
168214
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
@@ -173,6 +219,7 @@ zookeeper.connect={{ amq_streams_broker_zookeeper_host }}:{{ amq_streams_broker_
173219
# Timeout in ms for connecting to zookeeper
174220
zookeeper.connection.timeout.ms={{ amq_streams_broker_zookeeper_connection_timeout_ms }}
175221
zookeeper.session.timeout.ms={{ amq_streams_broker_zookeeper_session_timeout_ms }}
222+
{% endif %}
176223

177224
############################# Group Coordinator Settings #############################
178225

roles/amq_streams_kraft/README.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# AMQ Streams KRaft Role
2+
3+
This role orchestrates the initialization and configuration of Apache Kafka in KRaft (Kafka Raft Metadata) mode, removing the traditional dependency on ZooKeeper. It handles the mandatory bootstrapping process by generating a unique Cluster UUID (if not provided) and executing kafka-storage.sh to format the storage directories with the necessary metadata. Additionally, it prepares the critical configuration parameters required for a ZooKeeper-less environment, ensuring the correct setup of process roles, controller quorums, and the strict separation of broker and controller listeners.
4+
5+
## Requirements
6+
7+
* **Role Dependencies**: `amq_streams_common` (must be run first to install the Kafka binaries).
8+
* **Kafka Version**: Designed for Kafka 4.0.0+ or Kafka 3.x with KRaft enabled.
9+
10+
## Role Variables
11+
12+
| Variable | Default Value | Description |
13+
| :--- | :--- | :--- |
14+
| `amq_streams_install_dir` | `/opt` | The base directory where AMQ Streams/Kafka is installed. |
15+
| `amq_streams_kafka_home` | `{{ amq_streams_install_dir }}/kafka_{{ amq_streams_common_version }}/` | The absolute path to the Kafka installation home directory. |
16+
| `amq_streams_kraft_config_dir` | `{{ amq_streams_kafka_home }}/config` | The directory containing `server.properties` and other config files. |
17+
| `amq_streams_kraft_data_dir` | `{{ amq_streams_kafka_home }}/data/kraft` | The directory where Kafka stores its KRaft metadata and log data. |
18+
| `amq_streams_cluster_id` | `""` | The UUID of the Kafka cluster. If left empty, the role will auto-generate a random UUID during the first run. |
19+
| `amq_streams_kraft_node_id` | `1` | The unique integer ID for this specific broker/controller node. **Must be unique per host.** |
20+
| `amq_streams_kraft_process_roles` | `"broker,controller"` | Defines the role of this node. Options: `broker`, `controller`, or `broker,controller` (combined). |
21+
| `amq_streams_kraft_controller_quorum_voters` | `1@{{ ansible_host }}:{{ amq_streams_kraft_controller_port }}` | The voter connection string in the format `nodeId@host:port`. Must list all controller nodes in the cluster. |
22+
| `amq_streams_kraft_listener_port` | `9092` | The port used for standard Client (broker) traffic. |
23+
| `amq_streams_kraft_controller_port` | `9093` | The port used for Controller-to-Controller Raft communication. |
24+
| `amq_streams_kraft_listeners` | *(See defaults)* | A list of all listeners to bind. Must include **both** the Controller and Broker listeners (e.g., `PLAINTEXT://...` and `CONTROLLER://...`). |
25+
| `amq_streams_kraft_advertised_listeners` | *(See defaults)* | A list of listeners advertised to clients. Must **ONLY** include Broker listeners (e.g., `PLAINTEXT://...`). |
26+
| `amq_streams_kraft_controller_listener_names` | `"CONTROLLER"` | The listener name used by the controller quorum (must match an entry in `listeners`). |
27+
| `amq_streams_kraft_inter_broker_listener_name` | `"PLAINTEXT"` | The listener name used for replication between brokers. |
28+
| `amq_streams_kraft_log_dirs` | `{{ amq_streams_kraft_data_dir }}` | The comma-separated list of directories for log data. Usually matches the data dir. |
29+
| `amq_streams_kraft_log_retention_hours` | `168` | The number of hours to keep log segments before deletion (Default: 7 days). |
30+
| `amq_streams_kraft_priv_escalation` | `yes` | Controls whether tasks (like creating directories and formatting storage) run with elevated privileges (`become: true`). |
31+
32+
## Example Playbook
33+
34+
```yaml
35+
---
36+
- hosts: kafka_brokers
37+
vars:
38+
amq_streams_common_version: "3.7.0"
39+
amq_streams_enable_kraft: true
40+
41+
roles:
42+
# Install Java and Kafka Binaries
43+
- role: amq_streams_common
44+
45+
# Configure and Format Storage for KRaft
46+
- role: amq_streams_kraft
47+
when: amq_streams_enable_kraft | bool
48+
49+
# Start the Kafka Service
50+
- role: amq_streams_broker
51+
```
52+
53+
## License
54+
55+
Apache License v2.0 or later
56+
57+
## Author Information
58+
59+
* [Ranabir Chakraborty](https://github.com/RanabirChakraborty)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
amq_streams_install_dir: "/opt"
3+
amq_streams_kafka_home: "{{ amq_streams_install_dir }}/kafka_{{ amq_streams_common_version }}/"
4+
amq_streams_kraft_config_dir: "{{ amq_streams_kafka_home }}/config"
5+
amq_streams_kraft_data_dir: "{{ amq_streams_kafka_home }}/data/kraft"
6+
amq_streams_cluster_id: ""
7+
amq_streams_kraft_node_id: 1
8+
amq_streams_kraft_listener_port: 9092
9+
amq_streams_kraft_controller_port: 9093
10+
amq_streams_kraft_controller_quorum_voters: "1@{{ ansible_host }}:{{ amq_streams_kraft_controller_port }}"
11+
amq_streams_kraft_listeners:
12+
- "PLAINTEXT://0.0.0.0:{{ amq_streams_kraft_listener_port }}"
13+
- "CONTROLLER://0.0.0.0:{{ amq_streams_kraft_controller_port }}"
14+
amq_streams_kraft_advertised_listeners:
15+
- "PLAINTEXT://{{ ansible_host }}:{{ amq_streams_kraft_listener_port }}"
16+
amq_streams_kraft_controller_listener_names: "CONTROLLER"
17+
amq_streams_kraft_inter_broker_listener_name: "PLAINTEXT"
18+
amq_streams_kraft_process_roles: "broker,controller"
19+
amq_streams_kraft_log_dirs: "{{ amq_streams_kraft_data_dir }}"
20+
amq_streams_kraft_log_retention_hours: 168
21+
amq_streams_kraft_priv_escalation: yes
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
- name: Reload systemd
3+
ansible.builtin.systemd:
4+
daemon_reload: true
5+
6+
- name: Restart Kafka
7+
ansible.builtin.systemd:
8+
name: "{{ server_name | default('amq_streams_broker') }}"
9+
state: restarted

0 commit comments

Comments
 (0)