Kafka命令行操作指南
创作时间:
作者:
@小白创作中心
Kafka命令行操作指南
引用
CSDN
1.
https://blog.csdn.net/weixin_48935611/article/details/138920940
1. Kafka概述
2. 主题(Topic)命令行操作
Kafka 主题(Topic)是 Kafka 中用于组织和存储消息的逻辑单元。在 Kafka 中,生产者(Producer)将消息发布到主题,消费者(Consumer)从主题中订阅消息。
创建主题(Create Topic)
- 示例:创建一个名为 my_topic 的主题,包含 3 个分区和每个分区的副本数为 2。
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic --partitions 3 --replication-factor 2
bootstrap-server:指定 Kafka 集群的地址和端口。create:表示创建主题的操作。topic:指定要创建的主题名称。partitions:指定主题的分区数。replication-factor:指定每个分区的副本数。
列出所有主题(List Topics)
- 示例:列出 Kafka 集群中所有的主题名称。
kafka-topics.sh --bootstrap-server localhost:9092 --list
查看主题详情(Describe Topic)
- 示例:查看名为 my_topic 的主题的详细信息,包括分区、副本分配情况等。
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my_topic
删除主题(Delete Topic)
- 示例:删除名为 my_topic 的主题。
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my_topic
修改主题配置(Alter Topic Configuration)
- 示例:修改名为 my_topic 的主题的最大消息字节数为 1 MB。
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my_topic --alter --add-config max.message.bytes=1048576
增加分区(Add Partitions)
- 示例:将名为 my_topic 的主题的分区数增加到 5。
注意:分区数只能增加,不能减少
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic --partitions 5
查看主题的配置(View Topic Configuration)
- 示例:查看名为 my_topic 的主题的配置信息,如副本数、清理策略等。
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my_topic --describe
查找主题的消费者(Find Consumers for Topic)
- 示例:查找订阅了名为 my_topic 的主题的消费者组。
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --all-groups --topic my_topic
3. 生产者(Producer)命令行操作
发送消息(Produce Messages)
- 示例:启动一个交互式的控制台生产者,将消息发送到名为 my_topic 的主题。
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my_topic
bootstrap-server:指定 Kafka 集群的地址和端口。topic:指定要发送消息的目标主题。
指定消息键发送(Produce Messages with Keys)
- 示例:启动一个交互式的控制台生产者,向名为 my_topic 的主题发送带有键的消息。
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my_topic --property "parse.key=true" --property "key.separator=:"
property "parse.key=true":表示消息包含键。property "key.separator=:":指定键值对中键和值的分隔符。
从文件中发送消息(Produce Messages from File)
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my_topic < messages.txt
- 示例:从名为 messages.txt 的文件中读取消息,并将其发送到名为 my_topic 的主题。
自定义分区器发送消息(Produce Messages with Custom Partitioner)
- 示例:启动一个交互式的控制台生产者,并使用自定义的分区器将消息发送到名为 my_topic 的主题。
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my_topic --property "partitioner.class=my.custom.Partitioner"
property "partitioner.class=my.custom.Partitioner":指定自定义的分区器类。
设置消息发送速率(Setting Message Sending Rate)
- 示例:使用性能测试工具以指定的速率向名为 my_topic 的主题发送消息。
kafka-producer-perf-test.sh --topic my_topic --throughput 100 --num-records 1000000 --record-size 100 --producer-props bootstrap.servers=localhost:9092
throughput:指定消息发送速率(消息/秒)。num-records:指定要发送的消息总数。record-size:指定每条消息的大小。
4. 消费者(Consumer)命令行操作
消费消息(Consume Messages)
示例:启动一个交互式的控制台消费者,从名为 my_topic 的主题消费消息,并从起始处开始。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
bootstrap-server:指定 Kafka 集群的地址和端口。topic:指定要消费消息的目标主题。from-beginning:从主题的起始处开始消费消息。
指定消费者组消费消息(Consume Messages with Consumer Group)
- 示例:启动一个交互式的控制台消费者,将其添加到名为 my_group 的消费者组中,并从名为 my_topic 的主题消费消息。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --group my_group
group:指定消费者组的名称。
消费指定分区的消息(Consume Messages from Specific Partition)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --partition 0
partition:指定要消费消息的分区。
示例:启动一个交互式的控制台消费者,从名为 my_topic 的主题的分区 0 消费消息。
消费指定偏移量的消息(Consume Messages from Specific Offset)
- 示例:启动一个交互式的控制台消费者,从名为 my_topic 的主题的偏移量 1234 开始消费消息。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --offset 1234
offset:指定要消费消息的偏移量。
消费消息并输出到文件(Consume Messages and Output to File)
- 示例:启动一个控制台消费者,从名为 my_topic 的主题消费消息,并将消息输出到名为 output.txt 的文件中。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning > output.txt
热门推荐
打嗝最快方法:止嗝按摩内关
花生油的营养价值与食用指南
2025高速免费日历出炉!快收藏
“上岸”,是“站点”而非终点
二次元插画师2024年就业形式分析
八种运动减肥法:让你快速减肥不反弹
不同土地利用方式土壤呼吸速率动态研究进展
古月方源为什么叫大爱仙尊
解锁公共图书馆“新玩法”
左眼睛胀痛是什么原因引起的
智能跌倒监测:科技之盾守护老人居家安全
如何探索稀土资源的开发利用?这种开发利用如何实现可持续发展?
2月起,中日直飞航线将大幅增加!
战争加剧气候危机,俄乌、巴以冲突已产生上亿吨碳排放
舆情应对策略如何制定?五大步骤详解!
劳务派遣人员的职业攀升:法律框架与实践探索
江南大学人才培养特色怎么样?培养拔尖创新人才,国家所需!
公司经营不善待岗降薪合法吗
谋刺摄政王载沣:汪精卫的“青葱岁月”,一场向死而生的暗杀
入选15次全明星有多难?历史仅7人做到,科比18次只排第三
【深度】港股IPO定价机制改革背后,打新江湖巨变
一种适合糖尿病人的高效走路法,走15分钟顶半小时!
金银花的栽培技术与管理方法是什么?
客户关系管理nps选项是什么
b5大还是a5大
车辆长时间不开,如何避免内饰老化?
家族墓地怎么设计?从排列到设计的全面解析!
被学位论文盲审“误判”卡住的硕博生
如何判断VMware虚拟机是否联网
泉州文化遗产的法律意义与保护——海上丝绸之路的文化见证