问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Spring Boot 基于 SCRAM 认证集成 Kafka 的详解

创作时间:
作者:
@小白创作中心

Spring Boot 基于 SCRAM 认证集成 Kafka 的详解

引用
CSDN
1.
https://blog.csdn.net/qq_53723451/article/details/142324947

在现代微服务架构中,Kafka作为消息中间件被广泛使用,而安全性则是其中的一个关键因素。本文将详细介绍如何在Spring Boot应用中集成Kafka并使用SCRAM认证机制进行安全连接,并实现动态创建账号、ACL权限、Topic,以及生产者和消费者等操作。

需要准备一个配置了SCRAM认证的Kafka环境,可参考《基于SASL/SCRAM让Kafka实现动态授权认证》进行部署。

一、添加依赖

在Spring Boot项目的pom.xml中添加spring-kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

二、配置Kafka

application.yml中配置Kafka的相关属性,包括服务器地址、认证信息等:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: SCRAM-SHA-256
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="your_username" password="your_password";
    consumer:
      group-id: test-consumer-group
      auto-offset-reset: earliest
      properties:
        sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • bootstrap-servers: Kafka的集群地址
  • security.protocol: 通讯协议指定启用SASL
  • sasl.mechanism: 指定SASL使用的具体身份验证机制
  • sasl.jaas.config: 指定认证模块的处理类以及"用户名"和"密码"
  • auto-offset-reset: 指定偏移量的逻辑,"earliest"代表新加入的消费者都是从头开始消费

三、动态管理资源

3.1. 创建KafkaAdminClient

KafkaAdminClient用于管理Kafka资源(用户、ACL、主题等)。以下是示例代码:

@Configuration
public class KafkaConfig {
    @Bean
    public KafkaAdminClient kafkaAdminClient(KafkaAdmin kafkaAdmin) {
        return (KafkaAdminClient) KafkaAdminClient.create(kafkaAdmin.getConfigurationProperties());
    }
}

3.2. 动态创建用户和设置权限

使用Kafka AdminClient API实现动态创建用户和设置ACL权限:

/**
 * 创建用户
 */
public void createUser(String userName, String password) throws ExecutionException, InterruptedException {
    // 构造Scram认证机制信息
    ScramCredentialInfo info = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192);
    //用户信息
    UserScramCredentialAlteration userScramCredentialAdd = new UserScramCredentialUpsertion(userName, info, password);
    AlterUserScramCredentialsResult result = kafkaAdminClient.alterUserScramCredentials(List.of(userScramCredentialAdd));
    result.all().get();
}

/**
 * 配置用户只读权限
 */
public void createAcl(String account, String topicName, String consumerGroup) {
    AclBinding aclBindingTopic = genAclBinding(account, ResourceType.TOPIC, topicName, AclOperation.READ);
    AclBinding aclBindingGroup = genAclBinding(account, ResourceType.GROUP, consumerGroup, AclOperation.READ);
    kafkaAdminClient.createAcls(List.of(aclBindingTopic, aclBindingGroup));
}

3.3. 动态创建主题

public void createTopic(String topicName, int partitions, short replicationFactor) throws ExecutionException, InterruptedException {
    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
    CreateTopicsResult result = kafkaAdminClient.createTopics(List.of(newTopic));
    result.all().get();
}

四、生产者和消费者配置

4.1. 生产者配置

配置Kafka生产者,用于发送消息:

@Service
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    public void sendMessage(String message) {
        kafkaTemplate.send("test", message);
    }
}

4.2. 消费者配置

使用@KafkaListener注解实现消费消息方法:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "test", groupId = "test-consumer-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

五、总结

通过以上步骤,我们成功地在Spring Boot应用中集成了Kafka,并使用SCRAM认证机制进行安全连接。确保在生产环境中妥善管理用户凭证,并根据需要调整Kafka的安全配置。

完整样例代码下载:https://gitee.com/zlt2000/kafka-scram-demo

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号