Kafka和Zookeeper的基本概念及使用方法
Kafka和Zookeeper的基本概念及使用方法
Kafka和Zookeeper是分布式系统中常用的组件,Kafka主要用于构建实时数据管道和流应用,而Zookeeper则用于解决分布式系统中的一致性问题。本文将详细介绍这两个组件的基本概念及其使用方法。
什么是Kafka和Zookeeper?
Kafka概念:
Kafka 是一种分布式流处理平台,最初由 LinkedIn 开发,后成为 Apache 开源项目。它主要用于构建实时数据管道和流应用,具有高吞吐量、低延迟和可扩展性。
Zookeeper概念:
ZooKeeper 是一个分布式协调服务,由 Apache 开源,用于解决分布式系统中的一致性问题(如配置管理、节点选举、分布式锁等)。
补充:(什么是MQ?)
MQ的作用:
MessageQueue,消息队列。队列,是一种FIFO先进先出的数据结构,消息则是跨进程传递的数据,一个典型的MQ系统,会将消息由生产者发送到MQ进行排队,根据排序由消息的消费者进行处理。
作用:
- 异步:异步能够提高系统的响应速度、吞吐量。
- 解耦:
- 服务之间进行解耦,才可以减少服务之间影响,提高系统整体稳定性和可扩展性。
- 解耦可以实现数据分发,生产者发送消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或减少对生产者没有影响。
- 削峰:以稳定的系统资源应对突发的流量冲击。
特点:
- 数据吞吐量很大;
- 集群容错性高;
- 功能不需要太复杂;
- 允许少量数据丢失;
Kafka核心概念
- Producer(生产者)
生产者是向 Kafka 主题(Topic)发布消息的客户端应用程序。生产者将消息发送到 Kafka 集群,消息会被存储在指定的主题中。
- Consumer(消费者)
消费者是从 Kafka 主题订阅并处理消息的客户端应用程序。消费者从 Kafka 集群读取消息并进行处理。
- Broker(代理)
Broker 是 Kafka 集群中的单个服务器节点,负责存储和传递消息。一个 Kafka 集群通常由多个 Broker 组成,以实现高可用性和负载均衡。
- Topic(主题)
主题是消息的类别或流名称。生产者将消息发布到特定的主题,消费者从主题订阅消息。
- Partition(分区)
分区是主题的物理分片,每个分区是一个有序、不可变的消息序列。分区允许 Kafka 并行处理数据。
- Offset(偏移量)
偏移量是消息在分区中的唯一标识符,用于标识消息的位置。消费者通过偏移量来跟踪已经读取的消息。
- Consumer Group(消费者组)
消费者组是一组共同消费一个主题的消费者。Kafka 确保每条消息只被消费者组内的一个消费者处理。
- Replication(复制)
复制是 Kafka 实现高可用性和容错性的机制。每个分区可以有多个副本,分布在不同的 Broker 上。
- Zookeeper
Zookeeper 是 Kafka 的分布式协调服务,用于管理 Kafka 集群的元数据和状态。
- Log(日志)
Kafka 使用日志来存储消息。每个分区的消息都存储在一个日志文件中,日志文件按时间或大小分段。
- Connector(连接器)
连接器是 Kafka 提供的工具,用于将 Kafka 与其他系统集成。Kafka Connect 是一个框架,用于构建和运行连接器。
- Streams(流)
Kafka Streams 是一个客户端库,用于构建流处理应用程序。它允许开发者以简单的方式处理和分析 Kafka 中的数据流。
准备及部署
1. 准备工作
服务器数量
ZooKeeper:至少3台服务器(奇数节点,避免脑裂问题)。
Kafka:至少3台服务器(建议与ZooKeeper分开部署,但也可复用)。
环境要求
所有服务器之间网络互通(开放端口:ZooKeeper默认2181/2888/3888,Kafka默认9092)。
关闭防火墙或配置规则允许相关端口。
安装 Java 8+(ZooKeeper和Kafka依赖Java环境)。
Docker容器通常是临时的,所以需要挂载卷来保存Zookeeper的数据目录和Kafka的日志目录,避免容器重启后数据丢失
2. 部署 ZooKeeper 集群
(1) 创建 Docker 自定义网络
# 在每台服务器上创建同一名称的覆盖网络(若跨主机需配置 Swarm 或 Calico)
docker network create zk-kafka-net
(2) 配置 ZooKeeper
- 在每台服务器上创建以下目录和文件:
mkdir -p /opt/docker/zookeeper/{data,conf}
- 配置 zoo.cfg
编辑 /opt/docker/zookeeper/conf/zoo.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data
clientPort=2181
server.1=zk-node1:2888:3888
server.2=zk-node2:2888:3888
server.3=zk-node3:2888:3888
- 写入 myid
在每台服务器的 /opt/docker/zookeeper/data 目录下创建 myid 文件:
# 在 zk-node1 执行
echo "1" > /opt/docker/zookeeper/data/myid
# 在 zk-node2 执行
echo "2" > /opt/docker/zookeeper/data/myid
# 在 zk-node3 执行
echo "3" > /opt/docker/zookeeper/data/myid
(3) 使用 Docker 启动 ZooKeeper
- 在每台服务器上运行以下命令(替换ZK_SERVER_ID 和 ZK_SERVERS):
docker run -d \
--name zookeeper \
--network zk-kafka-net \
-p 2181:2181 \
-p 2888:2888 \
-p 3888:3888 \
-e ZOO_MY_ID=1 \ # 与 myid 对应
-e ZOO_SERVERS="server.1=zk-node1:2888:3888;2181 server.2=zk-node2:2888:3888;2181 server.3=zk-node3:2888:3888;2181" \
-v /opt/docker/zookeeper/data:/data \
-v /opt/docker/zookeeper/conf/zoo.cfg:/conf/zoo.cfg \
zookeeper:3.8.1
3. 部署 Kafka 集群
(1) 配置 Kafka
- 在每台服务器上创建目录:
mkdir -p /opt/docker/kafka/data
(2) 使用 Docker 启动 Kafka
- 在每台服务器上运行以下命令(替换broker_id和kafka_advertised_listener):
docker run -d \
--name kafka \
--network zk-kafka-net \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \ # 唯一 ID(每台不同)
-e KAFKA_ZOOKEEPER_CONNECT=zk-node1:2181,zk-node2:2181,zk-node3:2181 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-node1:9092 \ # 改为当前节点 IP/域名
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE="false" \
-v /opt/docker/kafka/data:/var/lib/kafka/data \
bitnami/kafka:3.4.0
4. 验证集群
(1) 检查 ZooKeeper 集群状态
docker exec -it zookeeper zkServer.sh status
# 应输出 Mode: leader 或 Mode: follower
(2) 测试 Kafka 集群
# 创建 Topic
docker exec -it kafka \
kafka-topics.sh --create \
--topic test \
--bootstrap-server kafka-node1:9092 \
--partitions 3 \
--replication-factor 2
# 生产消息
docker exec -it kafka \
kafka-console-producer.sh \
--topic test \
--bootstrap-server kafka-node1:9092
# 消费消息(在另一终端执行)
docker exec -it kafka \
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server kafka-node1:9092
5. 使用 Docker Compose 简化部署
若所有节点在同一网络,可使用 docker-compose.yml 统一管理。
示例 docker-compose.yml(单节点模板,需修改后分发到每台服务器):
version: '3'
services:
zookeeper:
image: zookeeper:3.8.1
networks:
- zk-kafka-net
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk-node1:2888:3888;2181 server.2=zk-node2:2888:3888;2181 server.3=zk-node3:2888:3888;2181
volumes:
- /opt/docker/zookeeper/data:/data
- /opt/docker/zookeeper/conf/zoo.cfg:/conf/zoo.cfg
kafka:
image: bitnami/kafka:3.4.0
networks:
- zk-kafka-net
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zk-node1:2181,zk-node2:2181,zk-node3:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-node1:9092
volumes:
- /opt/docker/kafka/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
zk-kafka-net:
driver: bridge
6. 常见问题
- 节点无法通信
- 检查防火墙规则和Docker网络配置。
- 确保kafka_advertised_listeners使用正确的IP/域名。
- 数据持久化失败
- 确保挂载目录(如/opt/docker/zookeeper/data)有写的权限。
- ZooKeeper选举失败
- 确认zoo_servers配置一致,且myid文件正确。