Kafka
零、资源准备
一、搭建完全分布式集群(Kafka)
注意:下载并使用Kafka前,必须安装Zookeeper!
kafka官网下载地址:https://kafka.apache.org/downloads
/image-20240721184641847.png)
安装
- 解压压缩包到/opt目录并设置软链接(或直接改名)
1 2
| tar -zxvf /software/kafka_2.13-3.7.1.tgz -C /opt/ ln -s /opt/kafka_2.13-3.7.1 /opt/kafka
|
/image-20240721185231758.png)
- 进入hadoop1机器的
/opt/kafka/config
目录并修改配置文件server.properties
1 2
| cd /opt/kafka/config/ vim server.properties
|
修改以下内容
1 2 3 4
| broker.id=1 log.dir=/opt/data/kafka/kafka-logs listeners=PLAINTEXT://hadoop1:9092 zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181
|
/image-20240721191814025.png)
- 修改
/etc/profile
文件,填入以下内容
1 2
| export KAFKA_HOME=/opt/kafka export PATH=$PATH:$KAFKA_HOME/bin
|
修改后记得source一下
/image-20240721190308241.png)
- 分发到hadoop2和hadoop3
1 2 3 4
| scp -r /opt/kafka hadoop2:/opt/ scp -r /opt/kafka hadoop3:/opt/ scp -r /etc/profile hadoop2:/etc scp -r /etc/profile hadoop3:/etc
|
1 2
| ssh hadoop2 "source /etc/profile" ssh hadoop3 "source /etc/profile"
|
/image-20240721193839715.png)
- 修改hadoop2和hadoop3的
/opt/kafka/config/server.properties
文件
hadoop2
1 2
| broker.id=2 listeners=PLAINTEXT://hadoop2:9092
|
hadoop3
1 2
| broker.id=3 listeners=PLAINTEXT://hadoop3:9092
|
启动
三台机器都要启动Zookeeper
三台机器都要启动Kafka
- 启停命令(三台都需要启动)
kafka启动命令:(不能用环境变量启动,因为环境变量只配置了bin,没配置config)
1 2 3
| /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties ssh hadoop2 "/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties" ssh hadoop3 "/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties"
|
/image-20240721194626136.png)
查看三台机器的Kafka状态
1 2 3
| jps ssh hadoop2 "jps" ssh hadoop3 "jps"
|
/image-20240721194741943.png)
kafka停止命令:(以hadoop1为例,如运行时找不到文件,可用完整路径运行,即加上前缀/opt/kafka/bin
)
1 2 3
| kafka-server-stop.sh ssh hadoop2 "kafka-server-stop.sh" ssh hadoop3 "kafka-server-stop.sh"
|
/image-20240721193949189.png)
- 创建Topic(如运行时找不到文件,可用完整路径运行,即加上前缀
/opt/kafka/bin
)
1)进入到hadoop1的Kafka的/opt/kafka
目录下,创建一个名为test
的Topic。
1
| kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic test --partitions 3 replication-factor 1
|
/image-20240721195326702.png)
2)查看创建的Topic列表。
kafka-topics.sh --list --bootstrap-server hadoop1:9092
/image-20240721195519477.png)
- 测试通信(以hadoop1和hadoop2为例)
1)连接生产者,命令后面需要对应 Topic 名称。(如找不到文件,可用完整路径运行)
在hadoop1的命令行输入以下命令
kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic test
/image-20240721200131235.png)
2)连接消费者,需要连接消费者的Topic
在hadoop2的命令行输入以下命令
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test --from-beginning
当用户在hadoop1输入消息时,hadoop2会实时显示出用户在hadoop1中输入的消息
/image-20240721200206695.png)
同理,hadoop3也可以。
脚本
可用脚本启停Kafka,查看Kafka状态
- 在
/opt/xshell
文件夹中创建脚本xjps.sh
,并输入以下内容
1 2 3 4 5 6
| #! /bin/bash for i in hadoop1 hadoop2 hadoop3 do echo --------- $i ---------- ssh $i "/opt/jdk/bin/jps $$*" done
|
/image-20240721201352162.png)
运行脚本,查看状态
/image-20240721201425053.png)
- 在
/opt/xshell
文件夹中创建脚本xzookeeper.sh
,并输入以下内容(上一章做过的可跳过)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| #!/bin/bash
# 设置JAVA_HOME和更新PATH环境变量 export JAVA_HOME=/opt/jdk export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin
# 检查输入参数 if [ $# -ne 1 ]; then echo "用法: $0 {start|stop|status}" exit 1 fi
# 执行操作 case "$1" in start) echo "---------- Zookeeper 启动 ------------" /opt/zookeeper/bin/zkServer.sh start ssh hadoop2 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh start" ssh hadoop3 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh start" ;; stop) echo "---------- Zookeeper 停止 ------------" /opt/zookeeper/bin/zkServer.sh stop ssh hadoop2 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh stop" ssh hadoop3 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh stop" ;; status) echo "---------- Zookeeper 状态 ------------" /opt/zookeeper/bin/zkServer.sh status ssh hadoop2 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh status" ssh hadoop3 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh status" ;; *) echo "未知命令: $1" echo "用法: $0 {start|stop|status}" exit 2 ;; esac
|
/image-20240721202131221.png)
运行代码sh xzookeeper.sh status
,查看zookeeper状态
/image-20240721202100896.png)
- 在
/opt/xshell
文件夹中创建脚本xkafka.sh
,并输入以下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| #!/bin/bash
# Kafka和Zookeeper的配置 KAFKA_HOME=/opt/kafka ZOOKEEPER_HOME=/opt/zookeeper JAVA_HOME=/opt/jdk
# 定义启动Kafka的函数 start_kafka() { echo "Starting Kafka on hadoop1..." $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties echo "Starting Kafka on hadoop2..." ssh hadoop2 "export JAVA_HOME=$JAVA_HOME; export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties" echo "Starting Kafka on hadoop3..." ssh hadoop3 "export JAVA_HOME=$JAVA_HOME; export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties" }
# 定义停止Kafka的函数 stop_kafka() { echo "Stopping Kafka on hadoop1..." $KAFKA_HOME/bin/kafka-server-stop.sh echo "Stopping Kafka on hadoop2..." ssh hadoop2 "export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-stop.sh" echo "Stopping Kafka on hadoop3..." ssh hadoop3 "export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-stop.sh" }
# 定义检查Kafka状态的函数 check_status() { echo "Checking Kafka status on hadoop1..." ssh hadoop1 "jps | grep -i kafka" echo "Checking Kafka status on hadoop2..." ssh hadoop2 "jps | grep -i kafka" echo "Checking Kafka status on hadoop3..." ssh hadoop3 "jps | grep -i kafka" }
# 处理命令行参数 case "$1" in start) start_kafka ;; stop) stop_kafka ;; status) check_status ;; *) echo "Usage: $0 {start|stop|status}" exit 1 esac
|
/image-20240721201615724.png)
运行代码sh xkafka.sh status
,查看Kafka状态
/image-20240721201724979.png)
二、常见错误与解决方案
- 找不到路径,可用完整路径运行文件,即加上前缀
/opt/kafka/bin
- 在scp后,需要修改hadoop2和hadoop3中的
/opt/kafka/config/server.properties
文件
- 在scp系统文件
/etc/profile
后,记得source一下