kafka之本地搭建及使用

1.环境说明

主机环境:centos7
openjdk version “1.8.0_65”
kafka 1.1.0
zookeeper 3.4.10

2.安装包下载

kafka版本为1.1.0,kafka下载命令:

1
curl -L -O https://mirrors.cnnic.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

zookeeper版本为3.4.10,下载命令为:

1
curl -L -O http://apache.claz.org/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

3.安装

3.1 安装jdk

  • (1)卸载系统自带jdk
    查看系统中中的jdk
    1
    $ rpm -qa | grep java
    如果有可以使用以下命令批量卸载所有带有Java的文件
    1
    $ rpm -qa | grep java | xargs rpm -e --nodeps
  • (2)安装jdk
1
$ rpm -ivh jdk-7u80-linux-x64.rpm

(3)检查jdk是否安装成功

1
$ java --version

3.2 安装zookeeper

  • (1)解压
    1
    2
    3
    $ tar zxvf zookeeper-3.4.10.tar.gz
    $ mv -rf zookeeper-3.4.10 /usr/local/zookeeper-3.4.10
    $ cd /usr/local/zookeeper-3.4.10
  • (2)配置zookeeper环境变量

在shell中执行下面的脚本,即可完成环境变量的添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cat << 'EOF' >> /etc/profile
JAVA_HOME=/usr/java/jdk1.7.0_80
#zookeeper环境变量设置
ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.10
PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$ZOOKEEPER_HOME/lib:
TOMCAT_HOME=/usr/local/tomcat7
CATALINA_HOME=/usr/local/tomcat7
export ZOOKEEPER_HOME
export JAVA_HOME
export PATH
export CLASSPATH
export TOMCAT_HOME
export CATALINA_HOME
export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH
EOF
  • (3)配置

配置文件存放在$ZOOKEEPER_HOME/conf/目录下,将zoo_sample.cfd文件名称改为zoo.cfg, 缺省的配置内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

配置说明:

  • tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
  • dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
  • dataLogDir: log目录, 同样可以是任意目录. 如果没有设置该参数, 将使用和dataDir相同的设置。
  • clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
  • (4)启动Zookeeper

当这些配置项配置好后,启动zookeeper

1
./zkServer.sh start

zookeeper默认监听端口为2181,
其他相关命令:

1
netstat -tunlp|grep 2181 #查看zookeeper端口
1
./zkServer.sh stop #停止zookeeper

3.3 安装kafka

(1)解压

1
2
3
$ tar zxvf kafka_2.11-1.1.0
$ mv -rf kafka_2.11-1.1.0 /usr/local/kafka_2.11-1.1.0
$ cd /usr/local/kafka_2.11-1.1.0

(2)配置
进入kafka安装工程根目录编辑config/server.properties文件,添加配置:

1
listeners=PLAINTEXT://<your-ip>:9092

(3)启动kafka-server

1
bin/kafka-server-start.sh config/server.properties &     #后台启动kafka

至此,kafka单机环境搭建完毕。

(4)测试及使用

cd至kafka目录,运行如下命令创建一个名为test的topic:

1
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test

执行以下命令可查看当前的topic列表

1
bin/kafka-topics.sh --list --zookeeper zookeeper:2181

运行一个消息生产者,指定 topic 为刚刚创建的 test ,输入一些测试消息。

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

运行一个消息消费者,同样指定 topic 为 test,可以接收到生产者发送的消息。

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
-------------本文结束感谢您的阅读-------------
0%