问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

查看Kafka消息消费堆积情况

创作时间:
作者:
@小白创作中心

查看Kafka消息消费堆积情况

引用
CSDN
1.
https://blog.csdn.net/baidu_38432732/article/details/136515906

Kafka作为分布式消息队列系统,在大数据处理和实时数据流应用中扮演着重要角色。监控Kafka消息消费情况,特别是消费堆积(Lag)情况,对于保证系统稳定性和数据处理效率至关重要。本文将详细介绍如何使用Kafka自带的命令行工具查看消息消费堆积情况,并深入解析相关概念和计算方法。

Kafka命令行工具使用示例

查看主题命令

  • 展示topic列表
./kafka-topics.sh --list --zookeeper zookeeper_ip:2181
  • 描述topic
./kafka-topics.sh --describe --zookeeper zookeeper_ip:2181 --topic topic_name
  • 查看topic某分区偏移量最大(小)值
./kafka-run-class.sh kafka.tools.GetOffsetShell --topic topic_name --time -1 --broker-list broker_ip:9092 --partitions 0
  • 增加topic分区数
./kafka-topics.sh --zookeeper zookeeper_ip:2181 --alter --topic test --partitions 10
  • 删除topic:慎用,只会删除zookeeper中的元数据,消息文件须手动删除

  • 方法一:

    ./kafka-topics.sh --delete --zookeeper zookeeper_ip:2181 --topic topic_name
    
  • 方法二:待验证

    ./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper zookeeper_ip:2181 --topic topic_name
    
  • 查看topic消费进度,必须参数为–group, 不指定–topic,默认为所有topic,

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group_name
  • 列出所有主题中的所有用户组:
./kafka-consumer-groups.sh --bootstrap-server broker_ip:9092 --list
  • 要使用ConsumerOffsetChecker查看上一个示例中消费者组的偏移量

  • 按如下所示“describe”消费者组:

    ./kafka-consumer-groups.sh --bootstrap-server 10.1.3.84:9098 --describe --group group_name
    
  • 统计指定group下对应各个topic的消息量

./kafka-consumer-groups.sh --bootstrap-server 10.1.3.84:9098  --group group_name --describe | awk '{print $2, $6}' | grep -v "LAG\|^ "|awk '{topics[$1] += $2} END {for (topic in topics) print topic ": " topics[topic]}'
  • 例如
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group SPOT_MARKET --describe | awk '{print $2, $6}' | grep -v "LAG\|^ "|awk '{topics[$1] += $2} END {for (topic in topics) print topic ": " topics[topic]}'
NEEX_SPOT_DISPATCH_ORDER_FILL: 0
NEEX_SPOT_DISPATCH_LEVEL2: 12
NEEX_SPOT_DISPATCH_ORDER: 11
  • -members: 此选项提供使用者组中所有活动成员的列表。
./kafka-consumer-groups.sh --bootstrap-server broker_ip:9092 --describe --group group_name --members
  • 按topic和分区统计各个topic的分区的消息堆积情况
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group SPOT_MARKET --describe | awk 'BEGIN {OFS="\t"} NR==1 {print "Topic", "Partition", "LAG"} NR>2 {topics[$2 ":" $3] += $6} END {for (topic in topics) {split(topic, parts, ":"); print parts[1] , parts[2] , topics[topic]}}' | awk 'NR==1; NR>1 {print $0 | "sort -k1,1 -k2,2rn"}'
Topic	Partition	LAG
NEEX_SPOT_DISPATCH_LEVEL2	49	0
NEEX_SPOT_DISPATCH_LEVEL2	48	0
NEEX_SPOT_DISPATCH_LEVEL2	47	0
NEEX_SPOT_DISPATCH_LEVEL2	46	0
NEEX_SPOT_DISPATCH_LEVEL2	45	0
NEEX_SPOT_DISPATCH_LEVEL2	44	0
NEEX_SPOT_DISPATCH_LEVEL2	43	0
NEEX_SPOT_DISPATCH_LEVEL2	42	0
NEEX_SPOT_DISPATCH_LEVEL2	41	0
NEEX_SPOT_DISPATCH_LEVEL2	40	0
NEEX_SPOT_DISPATCH_LEVEL2	39	0
NEEX_SPOT_DISPATCH_LEVEL2	38	0
NEEX_SPOT_DISPATCH_LEVEL2	37	0
NEEX_SPOT_DISPATCH_LEVEL2	36	0
NEEX_SPOT_DISPATCH_LEVEL2	35	0
NEEX_SPOT_DISPATCH_LEVEL2	34	0
NEEX_SPOT_DISPATCH_LEVEL2	33	0
NEEX_SPOT_DISPATCH_LEVEL2	32	0
NEEX_SPOT_DISPATCH_LEVEL2	31	0
NEEX_SPOT_DISPATCH_LEVEL2	30	0
NEEX_SPOT_DISPATCH_LEVEL2	29	0
NEEX_SPOT_DISPATCH_LEVEL2	28	0
NEEX_SPOT_DISPATCH_LEVEL2	27	0
NEEX_SPOT_DISPATCH_LEVEL2	26	0
NEEX_SPOT_DISPATCH_LEVEL2	25	0
NEEX_SPOT_DISPATCH_LEVEL2	24	0
NEEX_SPOT_DISPATCH_LEVEL2	23	0
NEEX_SPOT_DISPATCH_LEVEL2	22	0
NEEX_SPOT_DISPATCH_LEVEL2	21	0
NEEX_SPOT_DISPATCH_LEVEL2	20	0
NEEX_SPOT_DISPATCH_LEVEL2	19	0
NEEX_SPOT_DISPATCH_LEVEL2	18	0
NEEX_SPOT_DISPATCH_LEVEL2	17	0
NEEX_SPOT_DISPATCH_LEVEL2	16	0
NEEX_SPOT_DISPATCH_LEVEL2	15	0
NEEX_SPOT_DISPATCH_LEVEL2	14	0
NEEX_SPOT_DISPATCH_LEVEL2	13	0
NEEX_SPOT_DISPATCH_LEVEL2	12	4
NEEX_SPOT_DISPATCH_LEVEL2	11	0
NEEX_SPOT_DISPATCH_LEVEL2	10	0
NEEX_SPOT_DISPATCH_LEVEL2	9	0
NEEX_SPOT_DISPATCH_LEVEL2	8	0
NEEX_SPOT_DISPATCH_LEVEL2	7	0
NEEX_SPOT_DISPATCH_LEVEL2	6	0
NEEX_SPOT_DISPATCH_LEVEL2	5	0
NEEX_SPOT_DISPATCH_LEVEL2	4	0
NEEX_SPOT_DISPATCH_LEVEL2	3	0
NEEX_SPOT_DISPATCH_LEVEL2	2	0
NEEX_SPOT_DISPATCH_LEVEL2	1	0
NEEX_SPOT_DISPATCH_LEVEL2	0	0

Kafka消息消费堆积原理与计算方法

消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。

对于Kafka而言,消息被发送至Topic中,而Topic又分成了多个分区(Partition),每一个Partition都有一个预写式的日志文件,虽然Partition可以继续细分为若干个段文件(Segment),但是对于上层应用来说可以将Partition看成最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。

每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。

上图中有四个概念:

  • LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
  • ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
  • HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
  • LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。

比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。

要计算Kafka中某个消费者的滞后量很简单,首先看看其消费了几个Topic,然后针对每个Topic来计算其中每个Partition的Lag,每个Partition的Lag计算就显得非常的简单了,参考下图:

由图可知消费Lag=HW - ConsumerOffset。Kafka中自带的kafka-consumer_groups.sh脚本中就有Lag的信息,示例如下:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 2 0 0 0 consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9e
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 5 0 0 0 consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9e
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 1 0 0 0 consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9e
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 3 0 0 0 consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9e
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 6 0 0 0 consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9e
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 0 1 1 0 consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9e
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 4 0 0 0 consumer-2-70947d01-8796-46a0-8df7-27bdb9a19e9e
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 11 0 0 0 consumer-3-1eff2fbc-fd4f-41c3-be60-2cd60e463617
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 7 0 0 0 consumer-3-1eff2fbc-fd4f-41c3-be60-2cd60e463617
depth NEEX_SPOT_DISPATCH_STORAGE_ORDER_FILL 10 0 0 0 consumer-3-1eff2fbc-fd4f-41c3-be60-2cd60e463617

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号