Kafka

零、资源准备

  • kafka_2.13-3.7.1.tgz

一、搭建完全分布式集群(Kafka)

注意:下载并使用Kafka前,必须安装Zookeeper!

kafka官网下载地址:https://kafka.apache.org/downloads

image-20240721184641847

安装

  1. 解压压缩包到/opt目录并设置软链接(或直接改名)
1
2
tar -zxvf /software/kafka_2.13-3.7.1.tgz -C /opt/
ln -s /opt/kafka_2.13-3.7.1 /opt/kafka

image-20240721185231758

  1. 进入hadoop1机器的/opt/kafka/config目录并修改配置文件server.properties
1
2
cd /opt/kafka/config/
vim server.properties

修改以下内容

1
2
3
4
broker.id=1
log.dir=/opt/data/kafka/kafka-logs
listeners=PLAINTEXT://hadoop1:9092
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

image-20240721191814025

  1. 修改/etc/profile文件,填入以下内容
1
2
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

修改后记得source一下

image-20240721190308241

  1. 分发到hadoop2和hadoop3
1
2
3
4
scp -r /opt/kafka hadoop2:/opt/
scp -r /opt/kafka hadoop3:/opt/
scp -r /etc/profile hadoop2:/etc
scp -r /etc/profile hadoop3:/etc
1
2
ssh hadoop2 "source /etc/profile"
ssh hadoop3 "source /etc/profile"

image-20240721193839715

  1. 修改hadoop2和hadoop3的/opt/kafka/config/server.properties文件

hadoop2

1
2
broker.id=2
listeners=PLAINTEXT://hadoop2:9092

hadoop3

1
2
broker.id=3
listeners=PLAINTEXT://hadoop3:9092

启动

三台机器都要启动Zookeeper

三台机器都要启动Kafka

  1. 启停命令(三台都需要启动)

kafka启动命令:(不能用环境变量启动,因为环境变量只配置了bin,没配置config)

1
2
3
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
ssh hadoop2 "/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties"
ssh hadoop3 "/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties"

image-20240721194626136

查看三台机器的Kafka状态

1
2
3
jps
ssh hadoop2 "jps"
ssh hadoop3 "jps"

image-20240721194741943

kafka停止命令:(以hadoop1为例,如运行时找不到文件,可用完整路径运行,即加上前缀/opt/kafka/bin)

1
2
3
kafka-server-stop.sh
ssh hadoop2 "kafka-server-stop.sh"
ssh hadoop3 "kafka-server-stop.sh"

image-20240721193949189

  1. 创建Topic(如运行时找不到文件,可用完整路径运行,即加上前缀/opt/kafka/bin

1)进入到hadoop1的Kafka的/opt/kafka目录下,创建一个名为test的Topic。

1
kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic test --partitions 3 replication-factor 1

image-20240721195326702

2)查看创建的Topic列表。

kafka-topics.sh --list --bootstrap-server hadoop1:9092

image-20240721195519477

  1. 测试通信(以hadoop1和hadoop2为例)

1)连接生产者,命令后面需要对应 Topic 名称。(如找不到文件,可用完整路径运行)

hadoop1的命令行输入以下命令

kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic test

image-20240721200131235

2)连接消费者,需要连接消费者的Topic

hadoop2的命令行输入以下命令

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test --from-beginning

当用户在hadoop1输入消息时,hadoop2会实时显示出用户在hadoop1中输入的消息

image-20240721200206695

同理,hadoop3也可以。

脚本

可用脚本启停Kafka,查看Kafka状态

  1. /opt/xshell文件夹中创建脚本xjps.sh,并输入以下内容
1
2
3
4
5
6
#! /bin/bash
for i in hadoop1 hadoop2 hadoop3
do
echo --------- $i ----------
ssh $i "/opt/jdk/bin/jps $$*"
done

image-20240721201352162

运行脚本,查看状态

image-20240721201425053

  1. /opt/xshell文件夹中创建脚本xzookeeper.sh,并输入以下内容(上一章做过的可跳过)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/bin/bash

# 设置JAVA_HOME和更新PATH环境变量
export JAVA_HOME=/opt/jdk
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin

# 检查输入参数
if [ $# -ne 1 ]; then
echo "用法: $0 {start|stop|status}"
exit 1
fi

# 执行操作
case "$1" in
start)
echo "---------- Zookeeper 启动 ------------"
/opt/zookeeper/bin/zkServer.sh start
ssh hadoop2 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh start"
ssh hadoop3 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh start"
;;
stop)
echo "---------- Zookeeper 停止 ------------"
/opt/zookeeper/bin/zkServer.sh stop
ssh hadoop2 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh stop"
ssh hadoop3 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh stop"
;;
status)
echo "---------- Zookeeper 状态 ------------"
/opt/zookeeper/bin/zkServer.sh status
ssh hadoop2 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh status"
ssh hadoop3 "export JAVA_HOME=/opt/jdk; export PATH=\$PATH:\$JAVA_HOME/bin; /opt/zookeeper/bin/zkServer.sh status"
;;
*)
echo "未知命令: $1"
echo "用法: $0 {start|stop|status}"
exit 2
;;
esac

image-20240721202131221

运行代码sh xzookeeper.sh status,查看zookeeper状态

image-20240721202100896

  1. /opt/xshell文件夹中创建脚本xkafka.sh,并输入以下内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/bin/bash

# Kafka和Zookeeper的配置
KAFKA_HOME=/opt/kafka
ZOOKEEPER_HOME=/opt/zookeeper
JAVA_HOME=/opt/jdk

# 定义启动Kafka的函数
start_kafka() {
echo "Starting Kafka on hadoop1..."
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

echo "Starting Kafka on hadoop2..."
ssh hadoop2 "export JAVA_HOME=$JAVA_HOME; export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"

echo "Starting Kafka on hadoop3..."
ssh hadoop3 "export JAVA_HOME=$JAVA_HOME; export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"
}

# 定义停止Kafka的函数
stop_kafka() {
echo "Stopping Kafka on hadoop1..."
$KAFKA_HOME/bin/kafka-server-stop.sh

echo "Stopping Kafka on hadoop2..."
ssh hadoop2 "export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-stop.sh"

echo "Stopping Kafka on hadoop3..."
ssh hadoop3 "export KAFKA_HOME=$KAFKA_HOME; $KAFKA_HOME/bin/kafka-server-stop.sh"
}

# 定义检查Kafka状态的函数
check_status() {
echo "Checking Kafka status on hadoop1..."
ssh hadoop1 "jps | grep -i kafka"

echo "Checking Kafka status on hadoop2..."
ssh hadoop2 "jps | grep -i kafka"

echo "Checking Kafka status on hadoop3..."
ssh hadoop3 "jps | grep -i kafka"
}

# 处理命令行参数
case "$1" in
start)
start_kafka
;;
stop)
stop_kafka
;;
status)
check_status
;;
*)
echo "Usage: $0 {start|stop|status}"
exit 1
esac

image-20240721201615724

运行代码sh xkafka.sh status,查看Kafka状态

image-20240721201724979

二、常见错误与解决方案

  1. 找不到路径,可用完整路径运行文件,即加上前缀/opt/kafka/bin
  2. 在scp后,需要修改hadoop2和hadoop3中的/opt/kafka/config/server.properties 文件
  3. 在scp系统文件/etc/profile后,记得source一下