Kafka操作命令详解:从主题管理到性能测试
Kafka操作命令详解:从主题管理到性能测试
本文主要介绍了Kafka的基本操作命令,包括主题管理、消息生产和消费、消费者组管理以及性能测试等内容。文章内容详实,包含了具体的命令示例和参数说明,适合Kafka初学者和中级用户参考。
05、Kafka 操作命令
1、主题命令
(1)创建主题
kafka-topics.sh --create --bootstrap-server <broker_list> --topic test1 --partitions 4 --replication-factor 3
--bootstrap-server
:设置Kafka执行节点--topic
:主题名称--partitions
:设置分区数,可以用于并发消费。--replication-factor
:设置副本因子,数量不能大于Kafka节点数。
(2)查看主题
kafka-topics.sh --list --bootstrap-server <broker_list>
(3)在配置的文件夹下,可以查看到topic对应的文件夹,其中分区数量取决于创建时设置的partitions参数。在其他机器上也会有对应的副本文件夹,数量取决于replication-factor参数。
(4)查询主题描述信息
kafka-topics.sh --describe --bootstrap-server <broker_list> --topic test1
ReplicationFactor
:表示副本数Partition
:表示当前的分区号Replicas
:表示副本存放的机器Leader
:表示三个副本中的leaderIsr
:表示当前可用的机器id
(5)修改主题
分区数partitions
可以修改,只能比原来的大。replication-factor
一旦确定就不能修改了。
kafka-topics.sh --alter --bootstrap-server <broker_list> --topic test1 --partitions 6
(6)删除主题
kafka-topics.sh --delete --bootstrap-server <broker_list> --topic test
2、生产消息
给test1
主题发送消息:
kafka-console-producer.sh --bootstrap-server <broker_list> --topic test1
>hello world
如果给一个不存在的主题发送消息,Kafka会自动创建该主题,并默认设置partitions
和replication-factor
为1。但建议手动创建topic,因为partitions
可以修改,而replication-factor
不允许修改。
3、消费消息
(1)消费最新数据
消费test1
主题下的消息:
kafka-console-consumer.sh --bootstrap-server <broker_list> --topic test1
(2)消费历史数据
消费test1
主题下的历史消息:
--from-beginning
表示从头开始消费:
kafka-console-consumer.sh --bootstrap-server <broker_list> --topic test1 --from-beginning
--offset earliest --partition 1
表示从1号分区的头部开始消费:
kafka-console-consumer.sh --bootstrap-server <broker_list> --topic test1 --offset earliest --partition 1
--offset latest --partition 1
表示从1号分区的尾部开始消费:
kafka-console-consumer.sh --bootstrap-server <broker_list> --topic test1 --offset latest --partition 1
--offset 2 --partition 1
从1号分区的offset为2的位置开始消费:
kafka-console-consumer.sh --bootstrap-server <broker_list> --topic test1 --offset 2 --partition 1
4、消费者组
消费者组其实就是一个容器,可以容纳若干个消费者,每一个消费者必须被包含在一个消费者组里面。
(1)通过--group
来指定消费者组。
kafka-console-consumer.sh --bootstrap-server <broker_list> --topic test1 --group my-group1
(2)查看已有的消费者组
kafka-consumer-groups.sh --bootstrap-server <broker_list> --list
(3)删除消费者组
删除消费者组,必须要先保证消费者组下没有消费者:
kafka-consumer-groups.sh --bootstrap-server <broker_list> --delete --group my-group1
(4)查看消费的位置
kafka-consumer-groups.sh --describe --bootstrap-server <broker_list> --group my-group1
PARTITION
:表示分区号CURRENT-OFFSET
:当前消费到的offset下标LOG-END-OFFSET
:当前分区最新的offset下标LAG
:是偏移量,未消费的数量。
消费详情:
Kafka消费者在消费数据的时候,都是分组消费的,不同的消费者组之间没有影响,都会去消费。在同一个组内消费,一条消息只能被一个消费者消费,同时一个分区数据只能被一个消费者消费,如果消费者数量大于分区数,则多余出来的消费者永远不会消费消息。如果分区数大于消费者,则会均匀的将分区分配给多个消费者。
因此,Kafka的topic的partition个数代表是Kafka的topic的并行度,同一时间最多可以有多个线程来消费topic的数据,所以如果要提高Kafka的topic的消费能力,应该增大partition的个数。
5、手动平衡 leader:
# 使用的脚本是:kafka-leader-election.sh
# 必要的参数:
# --bootstrap-server:指定服务器列表
# --election-type:选举的类型,默认选择preferred即可
# 下面的三个参数三选一:
# --all-topic-partitions:平衡所有的主题、所有的分区
# --topic:平衡指定的主题,如果选择这个参数,则必须使用--partition指定分区
# --path-to-json-file:将需要平衡的主题、分区信息写入一个json文件,指定这个文件
# json的格式: {"partitions": [{"topic": "test", "partition": 1}, {"topic": "test", "partition": 2}]}
kafka-leader-election.sh \
--bootstrap-server <broker_list> \
--election-type preferred \
--all-topic-partitions
6、kafka 自带的压力测试工具
用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
(1)生产者Producer压测
kafka-producer-perf-test.sh \
--topic test \
--record-size 100 \
--num-records 100000 \
--throughput -1 \
--producer-props bootstrap.servers=<broker_list>
record-size
:一条信息有多大,单位字节num-records
:总共发送多少条信息throughput
:每秒多少条信息,设置成-1,表示不限流,可测生产者最大吞吐量producer-props
:发送端端消息配置
结果:
100000 records sent, 27495.188342 records/sec (2.62 MB/sec), 1461.75 ms avg latency, 2183.00 ms max latency, 1696 ms 50th, 2103 ms 95th, 2177 ms 99th, 2181 ms 99.9th.
解析:
- 一共写入10万条消息
- 吞吐量为2.62 MB/sec
- 每次写入的平均延迟为1461.75ms
- 最大延迟2183.00 ms
(2)消费者 Consumer 压力测试
consumer测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能
kafka-consumer-perf-test.sh \
--broker-list <broker_list> \
--topic test \
--fetch-size 10000 \
--messages 10000000 \
--threads 1
broker-list
:节点地址topic
:指定topic名称fetch-size
:指定每个fetch的数据大小messages
:总共要消费的消息个数
结果:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-06-27 13:17:57:490, 2020-06-27 13:18:11:751, 20.0272, 1.4043, 210000, 14725.4751, 1593235077858, -1593235063597, -0.0000, -0.0001
解释:
- 开始时间
- 结束时间
- 共消费数据:20.0272M
- 吞吐量:1.4043MB/s
- 共消费数据:210000条
- 每秒消费:14725.4751条