kafka安装

kafka安装

hao Lv4

Apache Kafka is a distributed streaming platform

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services

首先讲一下Kafka中涉及的一些名词概念

  • Broker: 服务代理.实质上就是kafka集群的一个物理节点.
  • Topic: 特定类型的消息流.”消息”是字节的有效负载(payload),话题是消息的分类的名或种子名
  • Partition: Topic的子概念.一个Topic可以有多个Partition,但一个Partition只属于一个Topic.此外,Partition则是Consumer消费的基本单元.消费时.每个消费线程最多只能使用一个Partition.一个topic中partition的数量,就是每个user group中消费该topic的最大并行数量.
  • UserGroup: 为了便于实现MQ中多播,重复消费等引入的概念.如果ConsumerA和ConsumerB属于同一个UserGroup,那么对于ConsumerA消费过的数据,ConsumerB就不能再消费了.也就是说,同一个user group中的consumer使用同一套offset
  • Offset: Offset是专门对于Partition和UserGroup而言的,用于记录某个UserGroup在某个Partition中当前已经消费到达的位置.
  • Producer: 生产者,能够发布消息到话题的任何对象.直接向某topic下的某partition发送数据.leader负责主备策略、写入数据、发送ack.
  • Consumer: 消费者.可以订阅一个或者多个话题,通过fetch的方式从Broker中拉取数据,从而消费这些已经发布的信息的对象.kafka server不直接负责每个consumer消费到了哪,所以需要client和zk联合维护每个partition读到了哪里,即offset

搭建Kafka集群

1. 搭建

如果你的机器可以联机互联网的话就再好不过了,这将节省很多时间而且操作更简单,如果是离线环境则需要将用的tar包下载自行拷贝到目标机器.
我这里以linux的机器为例.

首先安装zookeeper

因为是可以连接互联网,所以我们就不使用kafka自带的zk了,来安装一个吧,反正也很简单
zookeeper官网里我们找到了Download页面,根据里面提供的镜像地址直接是有wget下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5.tar.gz
等待zk下载并解压完毕后,进入zk的目录下面cd apache-zookeeper-3.5.5/conf
这里包含了所有zk的配置文件(也就三个~)

蓝天白云

我们只需要拷贝一份zoo_sample.cfg并作出修改

1
2
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg

编辑之后的内容

1
2
3
4
5
6
7
8
tickTime=2000
initLimit=10
syncLimit=5
#目录自行创建
dataDir=/data/zookeeper/zk1/data
dataLogDir=/data/zookeeper/zk1/log
clientPort=2181
server.1=[你的host]:2888:3888

其中dataDir和dataLogDir根据你自己创建的目录来写.
如果你想要在一台机器上模拟一个集群启动多个zk节点的话,就重复以上操作多拷贝zoo.cfg文件,例如我需要三个节点的zk集群,

1
2
3
cp zoo_sample.cfg zoo1.cfg
cp zoo_sample.cfg zoo2.cfg
cp zoo_sample.cfg zoo3.cfg

并对每个cfg文件作出修改,数据与日志目录不要重复,并且在每一份文件中都要写入如下内容(端口可自定)

1
2
3
server.1=192.168.178.22:2287:3387
server.2=192.168.178.22:2288:3388
server.3=192.168.178.22:2289:3389

然后到每个zk节点的dataDir目录下面创建myid文件

1
2
cd /data/zookeeper/zk1/data
echo "1" > myid

三个节点都要创建对应的myid文件,值要根据上面cfg文件中写的server.id一一对应

接下来就是启动zk了

1
2
3
4
5
6
7
8
9
cd ../zookeeper/bin
./zkServer.sh start ../conf/zoo1.cfg
./zkServer.sh start ../conf/zoo2.cfg
./zkServer.sh start ../conf/zoo3.cfg
# 然后查看zk节点的状态
./zkServer.sh status ../conf/zoo2.cfg
ZooKeeper JMX enabled by default
Using config: ../conf/zoo2.cfg
Mode: leader

三个节点的zk集群启动成功,id为2的zk节点是leader

安装Kafka

同样我们在Kafka官网里找到下载页面,然后找到最新的镜像地址直接下载压缩包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz
下载并解压完成后进到目录里cd kafka/config

蓝天白云

同样我们假设启动两个broker的kafka集群,拷贝两份server.properties文件并做修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
cp server.properties server1.properties
# 修改以下内容
broker.id=1
listeners=PLAINTEXT://192.168.178.22:9092
log.dirs=/data/kafka/logs/kafka-logs-1
zookeeper.connect=192.168.178.22:2182,192.168.178.22:2181,192.168.178.22:2183
...
# 配置直接删除topic
delete.topic.enable=true
# 虽然不明白原因,但是不加这两个启动不起来..
advertised.host.name=192.168.178.22
advertised.port=9092

cp server.properties server2.properties
# 修改以下内容
broker.id=2
listeners=PLAINTEXT://192.168.178.22:9093
log.dirs=/data/kafka/logs/kafka-logs-2
zookeeper.connect=192.168.178.22:2182,192.168.178.22:2181,192.168.178.22:2183
...
delete.topic.enable=true
advertised.host.name=192.168.178.24
advertised.port=9093

启动两个kafka的broker

1
2
3
4
cd /data/kafka/kafka-2.11/bin
# 将日志写入文件并置于后台启动
kafka-server-start.sh ../config/server1.properties > broker1.log 2>&1 &
kafka-server-start.sh ../config/server2.properties > broker2.log 2>&1 &

至此,kafka搭建完成,可以使用jobs命令查看一下任务状况

1
2
3
[root@test-hao config]# jobs
[1]- Running kafka-server-start.sh ../config/server2.properties > broker2.log 2>&1 & (wd: /data/kafka/kafka_2.11-2.2.1/bin)
[2]+ Running kafka-server-start.sh ../config/server1.properties > broker1.log 2>&1 & (wd: /data/kafka/kafka_2.11-2.2.1/bin)

2. 测试Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
cd /data/kafka/kafka-2.11/bin
#创建主题
./kafka-topics.sh \
--create \
--bootstrap-server 192.168.178.22:9092,192.168.178.22:9093 \
--replication-factor 2 \
--partitions 3 \
--topic test

# 查看broker的情况
./kafka-topics.sh --describe \
--zookeeper 10.4.121.218:3333 \
--topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
# 配置情况:分区数据在节点broker.id=0、主节点是broker.id=1、副本集是broker.id=1,0,2、isr是broker.id=1,0,2
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

# 生产者
./kafka-console-producer.sh \
--broker-list 192.168.178.22:9092,192.168.178.22:9093 \
--topic test
...
test1
test2
^C

# 消费者
# 在开启一个窗口启动消费者可以看到在实时消费数据
./kafka-console-consumer.sh \
--bootstrap-server 192.168.178.22:9092,192.168.178.22:9093 \
--from-beginning \
--topic test
...
test1
test2
^C

# 删除
./kafka-topics.sh --delete \
--zookeeper 192.168.178.22:2181 \
--topic test
# 如果在server.properties文件中没有添加 delete.topic.enable=true 这一项,topic只是被标记了删除,并没有真正被删除,还需要去zk里进行手动删除,所以我一般是添加这一项的(也不会经常出现需要去删除一个topic的情况)
On this page
kafka安装