# kafka 集群部署(3 个节点)

# 一、集群规划

在 node1、node2 和 node3 三个节点上都部署 kafka。

# 二、下载

# 下载kafka-华为镜像站下载的3.0.0版本
cd ~ && wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate

1
2
3

# 二、安装

# 解压
tar -zxvf kafka_2.12-3.0.0.tgz -C /usr/local/
# 修改文件夹名称
mv /usr/local/kafka_2.12-3.0.0/ /usr/local/kafka

1
2
3
4
5

# 三、配置、启动

# 1、修改配置文件共同配置

mkdir -p /usr/local/kafka/logdirs
#注释掉原有的配置
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/server.properties  -i
#加入新配置
grep -q '^#log.dirs=' /usr/local/kafka/config/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs' /usr/local/kafka/config/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs' >> /usr/local/kafka/config/server.properties

1
2
3
4
5
6

# 2、修改 zookeeper 配置(可选)

如果需要,也可以修改 zookeeper 的配置

# zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
# zookeeper.connection.timeout.ms=18000
#注释掉原有的配置
sed "s/^zookeeper.connect=.*/#&/" /usr/local/kafka/config/server.properties  -i
#加入新配置
grep -q '^#zookeeper.connect=' /usr/local/kafka/config/server.properties && sed -i '/^#zookeeper.connect=.*/a\zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka' /usr/local/kafka/config/server.properties || echo 'zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka' >> /usr/local/kafka/config/server.properties

1
2
3
4
5
6
7

# 3、不同的服务器不同配置

(1)配置服务器编号:broker.id 分别设置为 0、1、2

sed "s/^broker.id=.*/#&/" /usr/local/kafka/config/server.properties  -i
# node1
sed -i '/^#broker.id=.*/a\broker.id=0' /usr/local/kafka/config/server.properties
# node2
sed -i '/^#broker.id=.*/a\broker.id=1' /usr/local/kafka/config/server.properties
# node3
sed -i '/^#broker.id=.*/a\broker.id=2' /usr/local/kafka/config/server.properties

1
2
3
4
5
6
7
8

# 4、启动 zookeeper 和 kafka

# 如果需要启动zookeeper的话
cd /usr/local/kafka && ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/server.properties
1
2
3
4

# 5、关闭 kafka

# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

1
2
3

# 四、Kraft 模式

# 1、集群规划

在 node1、node2 和 node3 三个节点上都作为 broker, controller 节点。

如果后续还有 node4、和 node5 节点,这两个节点配置为 broker 节点。

# 2、修改配置文件共同配置

mkdir -p /usr/local/kafka/logdirs2
#注释掉原有的配置
sed "s/^controller.quorum.voters=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
#加入新配置
grep -q '^#controller.quorum.voters=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#controller.quorum.voters=.*/a\controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093' /usr/local/kafka/config/kraft/server.properties || echo 'controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#log.dirs=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs2' /usr/local/kafka/config/kraft/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs2' >> /usr/local/kafka/config/kraft/server.properties

1
2
3
4
5
6
7
8

# 3、修改配置文件不同的服务器不同配置

sed "s/^node.id=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
sed "s/^advertised.listeners=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
#node1
grep -q '^#node.id=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#node.id=.*/a\node.id=1' /usr/local/kafka/config/kraft/server.properties || echo 'node.id=1' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://node1:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://node1:9092' >> /usr/local/kafka/config/kraft/server.properties

#node2
grep -q '^#node.id=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#node.id=.*/a\node.id=2' /usr/local/kafka/config/kraft/server.properties || echo 'node.id=2' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://node3:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://node3:9092' >> /usr/local/kafka/config/kraft/server.properties

#node3
grep -q '^#node.id=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#node.id=.*/a\node.id=3' /usr/local/kafka/config/kraft/server.properties || echo 'node.id=3' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://node3:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://node3:9092' >> /usr/local/kafka/config/kraft/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13

# 4、初始化集群数据目录

生成存储目录唯一 ID 并格式化 kafka 存储目录。

#单机可以直接执行改命令,集群需要分开执行,集群所有机子的唯一id要相同
/usr/local/kafka/bin/kafka-storage.sh random-uuid

/usr/local/kafka/bin/kafka-storage.sh format -t i6O4t5ABTR6FfZrfkGBVEw -c /usr/local/kafka/config/kraft/server.properties
1
2
3
4

# 5、启动 kafka

# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
 cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
1
2

# 6、关闭 kafka

# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

1
2
3

# 五、验证

1.首先创建一个名为 chenlj 的 topic :

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chenlj
1

2.创建完成以后,可以使⽤命令来列出⽬前已有的 topic 列表

./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
1

3.查看 chenlj 主题的详情:

./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092  --topic chenlj
1

4.接下来创建一个生产者,用于在 chenlj 这个 topic 上生产消息:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic chenlj
1

5.接下来创建一个消费者,用于在 chenlj 这个 topic 上获取消息:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chenlj
1

此时生产者发出的消息,在消费者端可以获取到。

# 六、配置环境变量(全局可以使用 kafka 命令)

cat >>/etc/profile.d/kafka.sh <<EOF
export KAFKA_HOME=/usr/local/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF
source /etc/profile
1
2
3
4
5

# 七、配置远程访问

# 1、开启防火墙端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
1
2

# 八、设置开机自启

#待完善
1