# kafka 单机部署

# 一、下载

# 下载kafka-华为镜像站下载的3.0.0版本
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate
1
2

# 二、安装

# 解压
tar -zxvf kafka_2.12-3.0.0.tgz -C /usr/local/
# 修改文件夹名称
mv /usr/local/kafka_2.12-3.0.0/ /usr/local/kafka
1
2
3
4

# 三、配置、启动

# 1、修改 logs 文件地址(可选)

mkdir -p /usr/local/kafka/logdirs
#注释掉原有的配置
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/server.properties  -i
#加入新配置
grep -q '^#log.dirs=' /usr/local/kafka/config/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs' /usr/local/kafka/config/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs' >> /usr/local/kafka/config/server.properties
1
2
3
4
5

# 2、修改 zookeeper 配置(可选)

如果需要,也可以修改 zookeeper 的配置

zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
1
2

# 3、启动 zookeeper 和 kafka

# 如果需要启动zookeeper的话
cd /usr/local/kafka && ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/server.properties
1
2
3
4

# 4、关闭 kafka

# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

1
2
3

# 四、Kraft 模式

# 1、修改配置文件

mkdir -p /usr/local/kafka/logdirs2
#注释掉原有的配置
sed "s/^advertised.listeners=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
#加入新配置 **注意监听的ip为公网ip或内网ip
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://119.91.217.34:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://ip:9092' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#log.dirs=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs2' /usr/local/kafka/config/kraft/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs2' >> /usr/local/kafka/config/kraft/server.properties
1
2
3
4
5
6
7

# 2、初始化集群数据目录

生成存储目录唯一 ID 并格式化 kafka 存储目录。

#单机可以直接执行改命令,集群需要分开执行,集群所有机子的唯一id要相同
/usr/local/kafka/bin/kafka-storage.sh random-uuid|xargs -n1 -I {} /usr/local/kafka/bin/kafka-storage.sh format -t {} -c /usr/local/kafka/config/kraft/server.properties
1
2

# 3、启动 kafka

# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
 cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
1
2

# 4、关闭 kafka

# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

1
2
3

# 五、验证

1.首先创建一个名为 chenlj 的 topic :

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chenlj
1

2.创建完成以后,可以使⽤命令来列出⽬前已有的 topic 列表

./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
1

3.查看 chenlj 主题的详情:

./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092  --topic chenlj
1

4.接下来创建一个生产者,用于在 chenlj 这个 topic 上生产消息:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic chenlj
1

5.接下来创建一个消费者,用于在 chenlj 这个 topic 上获取消息:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chenlj
1

此时生产者发出的消息,在消费者端可以获取到。

# 六、配置环境变量(全局可以使用 kafka 命令)

cat >>/etc/profile.d/kafka.sh <<EOF
export KAFKA_HOME=/usr/local/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF
source /etc/profile
1
2
3
4
5

# 七、配置远程访问

# 1、开启防火墙端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
1
2

# 八、设置开机自启

#待完善
1