# kafka 单机部署
# 一、下载
# 下载kafka-华为镜像站下载的3.0.0版本
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate
1
2
2
# 二、安装
# 解压
tar -zxvf kafka_2.12-3.0.0.tgz -C /usr/local/
# 修改文件夹名称
mv /usr/local/kafka_2.12-3.0.0/ /usr/local/kafka
1
2
3
4
2
3
4
# 三、配置、启动
# 1、修改 logs 文件地址(可选)
mkdir -p /usr/local/kafka/logdirs
#注释掉原有的配置
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/server.properties -i
#加入新配置
grep -q '^#log.dirs=' /usr/local/kafka/config/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs' /usr/local/kafka/config/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs' >> /usr/local/kafka/config/server.properties
1
2
3
4
5
2
3
4
5
# 2、修改 zookeeper 配置(可选)
如果需要,也可以修改 zookeeper 的配置
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
1
2
2
# 3、启动 zookeeper 和 kafka
# 如果需要启动zookeeper的话
cd /usr/local/kafka && ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/server.properties
1
2
3
4
2
3
4
# 4、关闭 kafka
# 关闭命令
cd /usr/local/kafka && ./bin/kafka-server-stop.sh
1
2
3
2
3
# 四、Kraft 模式
# 1、修改配置文件
mkdir -p /usr/local/kafka/logdirs2
#注释掉原有的配置
sed "s/^advertised.listeners=.*/#&/" /usr/local/kafka/config/kraft/server.properties -i
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/kraft/server.properties -i
#加入新配置 **注意监听的ip为公网ip或内网ip
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://119.91.217.34:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://ip:9092' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#log.dirs=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs2' /usr/local/kafka/config/kraft/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs2' >> /usr/local/kafka/config/kraft/server.properties
1
2
3
4
5
6
7
2
3
4
5
6
7
# 2、初始化集群数据目录
生成存储目录唯一 ID 并格式化 kafka 存储目录。
#单机可以直接执行改命令,集群需要分开执行,集群所有机子的唯一id要相同
/usr/local/kafka/bin/kafka-storage.sh random-uuid|xargs -n1 -I {} /usr/local/kafka/bin/kafka-storage.sh format -t {} -c /usr/local/kafka/config/kraft/server.properties
1
2
2
# 3、启动 kafka
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
1
2
2
# 4、关闭 kafka
# 关闭命令
cd /usr/local/kafka && ./bin/kafka-server-stop.sh
1
2
3
2
3
# 五、验证
1.首先创建一个名为 chenlj 的 topic :
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chenlj
1
2.创建完成以后,可以使⽤命令来列出⽬前已有的 topic 列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
1
3.查看 chenlj 主题的详情:
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic chenlj
1
4.接下来创建一个生产者,用于在 chenlj 这个 topic 上生产消息:
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic chenlj
1
5.接下来创建一个消费者,用于在 chenlj 这个 topic 上获取消息:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chenlj
1
此时生产者发出的消息,在消费者端可以获取到。
# 六、配置环境变量(全局可以使用 kafka 命令)
cat >>/etc/profile.d/kafka.sh <<EOF
export KAFKA_HOME=/usr/local/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF
source /etc/profile
1
2
3
4
5
2
3
4
5
# 七、配置远程访问
# 1、开启防火墙端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
1
2
2
# 八、设置开机自启
#待完善
1