kafka-生产实战

kafka简介

Apache Kafka是由Apache软件基金会开发的一个开源消息系统项目,由Scala写成。Kafka最初是由LinkedIn开发,并于2011年初开源。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
Kafka是一个分布式的、分区的、多复本的日志提交服务。它通过一种独一无二的设计提供了一个消息系统的功能。

kafka应用场景

  • Kafka可以应用于消息系统,比如,当下较为热门的消息推送,这些消息推送系统的消息源,可以使用Kafka作为系统的核心组建来完成消息的生产和消息的消费。
  • 然后是网站的行迹,我们可以将企业的Portal,用户的操作记录等信息发送到Kafka中,按照实际业务需求,可以进行实时监控,或者做离线处理等。
  • 一个是日志收集,类似于Flume套件这样的日志收集系统,但Kafka的设计架构采用push/pull,适合异构集群,Kafka可以批量提交消息,对Producer来说,在性能方面基本上是无消耗的,而在Consumer端中,我们可以使用HDFS这类的分布式文件存储系统进行存储。

相关术语

  • Kafka维护按类区分的消息,称为主题(topic)
  • 生产者(producer)向kafka的主题发布消息
  • 消费者(consumer)向主题注册,并且接收发布到这些主题的消息
  • kafka以一个拥有一台或多台服务器的集群运行着,每一台服务器称为broker
  • 从高层来说,生产者(producer)通过网络发消息到kafka集群,而kafka集群则以下面这种方式对消费者进行服务。

kafka实战

1
2
3
4
5
6
7
cd /usr/local/src/
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
tar -xzf kafka_2.11-0.10.1.0.tgz
mv kafka_2.11-0.10.1.0 /data/app/
ln -s /data/app/kafka_2.11-0.10.1.0 /data/app/kafka
/data/app/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
/data/app/kafka/bin/kafka-server-start.sh config/server.properties

kafka单节点实战

编辑zookeeper配置文件

1
vim /data/app/kafka/config/zookeeper.properties

编辑kafka配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
vim /data/app/kafka/config/server.properties
broker.id=0
port=9092
advertised.host.name=10.0.2.150
num.network.threads=3

num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:4188
zookeeper.connection.timeout.ms=6000

#这三个必须得配置
#这个是配置PRODUCER/CONSUMER连上来的时候使用的地址(必须得配置)
advertised.host.name=192.168.56.12
#设置KAFKA LOG路径
log.dirs=$KAFKA_HOME/logs/kafka-logs
#设置ZOOKEEPER的连接地址
zookeeper.connect=192.168.56.12:2181

启动kafka服务

1
sh /data/app/kafka/bin/kafka-server-start.sh /data/app/kafka/config/server.properties >/dev/null 2>&1 &

kafka集群实战

创建三个配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cat >/data/app/kafka/config/server.properties-0<<EOF
broker.id=0
port=9090
advertised.host.name=10.0.2.150
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-0
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=127.0.0.1:2880,127.0.0.1:2881,127.0.0.1:2882
zookeeper.connection.timeout.ms=6000
EOF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
cat >/data/app/kafka/config/server.properties-1<<EOF
broker.id=1
port=9091
advertised.host.name=10.0.2.150
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=127.0.0.1:2880,127.0.0.1:2881,127.0.0.1:2882
zookeeper.connection.timeout.ms=6000
EOF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
cat >/data/app/kafka/config/server.properties-2<<EOF
broker.id=2
port=9092
advertised.host.name=10.0.2.150
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=127.0.0.1:2880,127.0.0.1:2881,127.0.0.1:2882
zookeeper.connection.timeout.ms=6000
EOF

启动kafka集群

1
2
3
./kafka-server-start.sh /data/app/kafka/config/server.properties >/dev/null 2>&1 &
./kafka-server-start.sh /data/app/kafka/config/server.properties-1 >/dev/null 2>&1 &
./kafka-server-start.sh /data/app/kafka/config/server.properties-2 >/dev/null 2>&1 &

创建一个topic 一个分区,三个主机

1
2
[root@localhost kafka]# ./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2880  --replication-factor 3 --partitions 1 --topic topics
Created topic "topics".
  • –topic指定topic name
  • –partitions指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好;
  • –replication-factor指定partition的replicas数,建议设置为2;
  • KAFKA有几个,replication-factor就填几个

查看topics的信息

1
2
3
4
[root@localhost kafka]# ./bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2881 --topic topics
Topic:topics PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topics Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[root@localhost kafka]#
  • Leader:负责处理消息的读和写,Leader是从所有节点中随机选择的。
  • Replicas:列出了所有的副本节点,不管节点是否在服务中。
  • Isr:是正在服务中的节点

kafka创建topic

1
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

kafka管理

kafka配置管理

  • advertised.host.name=eventbus1.kafka
  • num.network.threads=3 broker处理消息的最大线程数,一般情况下数量为cpu核数
  • num.io.threads: Kafka broker 处理磁盘 IO 的线程数
  • socket.receive.buffer.bytes: socket的接收缓冲区大小
  • socket.send.buffer.bytes: socket的发送缓冲区大小
  • socket.request.max.bytes=104857600 socket请求的最大数值
  • log.dirs=/var/log/kafka 消息日志存放位置
  • num.partitions: 创建 topic 如果不指定分区数时的默认值
  • log.flush.interval.messages=1000 表示每当消息记录数达到1000时flush一次数据到磁盘
  • log.flush.interval.ms=1000 表示每间隔1000毫秒flush一次数据到磁盘
  • log.retention.bytes: topic 每个分区的最大文件大小
  • log.retention.hours: 消息保留的最大时间
  • log.segment.bytes =102410241024 topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
  • log.retention.check.interval.ms=30000000 文件大小检查的周期时间
  • auto.create.topics.enable: 自动创建 topic
  • default.replication.factor: 创建 topic 如果不指定复制因子时的默认值
  • delete.topic.enable: 是否支持删除 topic
  • message.max.bytes: 消息的最大尺寸
  • num.replica.fetchers: 从分区 Leader 复制消息的线程数
  • queued.max.requests: 等待 IO 线程处理的请求队列最大数,若是等待 IO 的请求超过这个数值,就会停止接受外部消息
  • zookeeper.connect = localhost:2181 zookeeper集群的地址,可以是多个用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
  • zookeeper.session.timeout.ms=6000 ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
  • zookeeper.connection.timeout.ms =6000 ZooKeeper的连接超时时间
  • zookeeper.sync.time.ms =2000 ZooKeeper集群中leader和follower之间的同步时间

这里配置broker的时候,每台机器上的broker保证唯一,从0开始。如:在另外2台机器上分别配置broker.id=1,broker.id=2

kafka配置优化

网络和io操作线程配置优化

1
2
3
4
5
6
7
8
9
# broker处理消息的最大线程数
num.network.threads=xxx
# broker处理磁盘IO的线程数
num.io.threads=xxx

建议配置:
一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.

log数据文件刷新策略

1
2
3
4
5
6
为了大幅度提高producer写入吞吐量,需要定期批量写文件。
建议配置:
# 每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000

日志保留策略配置

当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。

1
2
3
4
5
6
7
log.retention.hours=72
# 保留三天,也可以更短

log.segment.bytes=1073741824
# 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,
# kafka启动时是单线程扫描目录(log.dir)下所有数据文件)

开启自动创建配置:

1
2
auto.create.topics.enable=true
使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。

创建topic

1
sh kafka-topics.sh --create --topic topic --replication-factor 1 --partitions 1 --zookeeper localhost:4188

KAFKA有几个,replication-factor就填几个

我们查看该Topic的相关信息

1
kafka-topics.sh --zookeeper localhost:4188 --topic topic --describe

查看都有哪些topic

1
./bin/kafka-topics.sh --list --zookeeper localhost:2181

模拟数据的生产和消费

使用producer生产消息

1
2
3
4
[root@localhost bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic topic
[2015-12-10 13:54:40,460] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hello
你好

使用consumer去消费消息

1
2
3
[root@localhost bin]# sh kafka-console-consumer.sh --zookeeper 10.0.2.150:4188 --topic topic --from-beginning
hello
你好

修改topic的partition

1
2
3
4
## 通过kafka-topics.sh工具的alter命令,将topic_test的partitions从12增加到20

./bin/kafka-topics.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –alter –partitions 20 –topic topic_test

修改kafka的分片配置

操作步骤如下:

操作,是指手动写扩充replicas的配置文件,然后使用工具进行操作

查看topic的详细信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
lizhitao@users-MacBook-Pro-2:~$ ./bin/kafka-topics.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –describe –topic test.example
Topic:test.example PartitionCount:12 ReplicationFactor:1 Configs:
Topic: test.example Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test.example Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test.example Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: test.example Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: test.example Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: test.example Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: test.example Partition: 6 Leader: 0 Replicas: 0 Isr: 0
Topic: test.example Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: test.example Partition: 8 Leader: 2 Replicas: 2 Isr: 2
Topic: test.example Partition: 9 Leader: 0 Replicas: 0 Isr: 0
Topic: test.example Partition: 10 Leader: 1 Replicas: 1 Isr: 1
Topic: test.example Partition: 11 Leader: 2 Replicas: 2 Isr: 2

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
将原有replicas为[0]扩充为[0,4], [1]扩充为[1,5],[2]扩充为[2,3]
[sankuai@data-kafka01 kafka]$ cat partitions-to-move.json
{
“partitions”:
[
{
“topic”: “test.example”,
“partition”: 0,
“replicas”: [0,4]
},
.....
{
“topic”: “test.example”,
“partition”: 11,
“replicas”: [2,3]
}
],
“version”:1
}

执行

1
./bin/kafka-reassign-partitions.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –reassignment-json-file partitions-to-move.json –execute

检查修改情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[sankuai@data-kafka01 kafka]$ ./bin/kafka-topics.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –describe –topic test.example
Topic:test.example PartitionCount:12 ReplicationFactor:2 Configs:
Topic: test.example Partition: 0 Leader: 0 Replicas: 0,4 Isr: 0,4
Topic: test.example Partition: 1 Leader: 1 Replicas: 1,5 Isr: 1,5
Topic: test.example Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test.example Partition: 3 Leader: 0 Replicas: 0,4 Isr: 0,4
Topic: test.example Partition: 4 Leader: 1 Replicas: 1,5 Isr: 1,5
Topic: test.example Partition: 5 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test.example Partition: 6 Leader: 0 Replicas: 0,4 Isr: 0,4
Topic: test.example Partition: 7 Leader: 1 Replicas: 1,5 Isr: 1,5
Topic: test.example Partition: 8 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test.example Partition: 9 Leader: 0 Replicas: 0,4 Isr: 0,4
Topic: test.example Partition: 10 Leader: 1 Replicas: 1,5 Isr: 1,5
Topic: test.example Partition: 11 Leader: 2 Replicas: 2,3 Isr: 2,3

kafka性能优化

1
2
3
4
5
####  配置jmx服务
kafka server中默认是不启动jmx端口的,需要用户自己配置
vim bin/kafka-run-class.sh
#最前面添加一行
JMX_PORT=8060

kafka监控和告警

通过使用,个人总结以上三种监控程序的优缺点:

Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。

Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。

KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。

若只需要监控功能,推荐使用KafkaOffsetMonito,若偏重Kafka集群管理,推荐使用Kafka Manager。

因为都是开源程序,稳定性欠缺。故需先了解清楚目前已存在哪些Bug,多测试一下,避免出现类似于Kafka Web Console的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
描述:所有的topic的消息速率(消息数/秒)
Mbean名:"kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics"
正常的值:

描述:所有的topic的流入数据速率(字节/秒)
Mbean名:"kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"
正常的值:

描述:producer或Fetch-consumer或Fetch-follower的请求速率(请求次数/秒)
Mbean名:"kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"
正常的值:

描述:所有的topic的流出数据速率(字节/秒)
Mbean名: "kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"
正常的值:

描述:刷日志的速率和耗时
Mbean名: "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"
正常的值:

描述:正在做复制的partition的数量(|ISR| < |all replicas|)
Mbean名:"kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager"
正常的值:0

描述:当前的broker是否为controller
Mbean名:"kafka.controller":name="ActiveControllerCount",type="KafkaController"
正常的值:在集群中只有一个broker的这个值为1

描述:选举leader的速率
Mbean名:"kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats"
正常的值:如果有broker挂了,此值非0

描述:Unclean的leader选举速率
Mbean名:"kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats"
正常的值:0

描述:该broker上的partition的数量
Mbean名: "kafka.server":name="PartitionCount",type="ReplicaManager"
正常的值:应在各个broker中平均分布

描述:Leader的replica的数量
Mbean名: "kafka.server":name="LeaderCount",type="ReplicaManager"
正常的值:应在各个broker中平均分布

描述:ISR的收缩(shrink)速率
Mbean名:"kafka.server":name="ISRShrinksPerSec",type="ReplicaManager"
正常的值:如果一个broker挂掉了,一些partition的ISR会收缩。当那个broker重新起来时,一旦它的replica完全跟上,ISR会扩大(expand)。除此之外,正常情况下,此值和下面的扩大速率都是0。

描述:ISR的扩大(expansion)速率
Mbean名: "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager"
正常的值:参见ISR的收缩(shrink)速率

描述:follower落后leader replica的最大的消息数量
Mbean名:"kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager"
正常的值:小于replica.lag.max.messages

描述:每个follower replica落后的消息速率
Mbean名:"kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics"
正常的值:小于replica.lag.max.messages

描述:等待producer purgatory的请求数
Mbean名:"kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory"
正常的值:如果ack=-1,应为非0值

描述:等待fetch purgatory的请求数
Mbean名:"kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"
正常的值:依赖于consumer的fetch.wait.max.ms的设置

描述:一个请求(producer,Fetch-Consumer,Fetch-Follower)耗费的所有时间
Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"
正常的值:包括了queue, local, remote和response send time

描述:请求(producer,Fetch-Consumer,Fetch-Follower)在请求队列中的等待时间
Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"
正常的值:

描述:请求(producer,Fetch-Consumer,Fetch-Follower)在leader处理请求花的时间
Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"
正常的值:

描述:请求(producer,Fetch-Consumer,Fetch-Follower)等待follower花费的时间
Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"
正常的值:producer的ack=-1时,非0才正常

描述:发送响应花费的时间
Mbean名:"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"
正常的值:

描述:consumer落后producer的消息数量
Mbean名:"kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"
正常的值:

建议对GC耗时和其他参数和诸如系统CPU,I/O时间等等进行监控。在client端,建议对"消息数量/字节数"的速率(全局的和对于每一个topic),请求的"速率/大小/耗时"进行监控。还有consumer端,所有partition的最大的落后情况和最小的fetch请求的速率。consumer为了能跟上,最大落后数量需要少于一个threshold并且最小fetch速率需要大于0.

kafka客户端汇总

pip install kafka-python
kafka官方文档

帮助文档

分布式消息中间件应用实践
http://www.ibm.com/developerworks/cn/opensource/os-cn-kafka-distributed/
Apache kafka 工作原理介绍
http://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html

kafka 快速入门手册
http://www.blogjava.net/paulwong/archive/2014/05/11/413506.html

kafka 集群安装与扩容
http://my.oschina.net/MaTech/blog/292090
http://my.oschina.net/MaTech/blog/292090

kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转)

kafka 性能测试
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

青云kafka集群介绍

https://docs.qingcloud.com/guide/queue.html#id7

kafka python+zabbix 监控脚本
http://club.oneapm.com/t/zabbix-kafka/854