spring关于kafka的最全配置
  以下是基于 Spring Boot 官方文档及多篇实践总结的 application.yml 中 Kafka 的最全配置详解,涵盖生产者、消费者、监听器、安全认证等场景,按模块分类整理:
# ⚙️ 一、公共基础配置
spring:
  kafka:
    # Kafka 集群地址(必填,多节点逗号分隔)
    bootstrap-servers: kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
    # 全局属性扩展(支持所有 Kafka 原生参数)
    properties:
      security.protocol: SASL_SSL  # 安全协议(如 SASL_SSL)
      sasl.mechanism: SCRAM-SHA-256 # 认证机制
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="secret"; # JAAS 配置
 1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 📤 二、生产者配置
spring:
  kafka:
    producer:
      # 序列化器(必填)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息确认机制(可靠性)
      acks: all  # 可选值:0(无需确认)、1(Leader 确认)、all(ISR 全部确认)
      retries: 3  # 发送失败重试次数
      # 性能优化
      batch-size: 16384  # 批量发送大小(字节),默认 16KB
      linger-ms: 10      # 消息等待时间(毫秒),积攒更多消息批量发送
      buffer-memory: 33554432  # 生产者缓冲区大小(字节),默认 32MB
      compression-type: gzip  # 压缩算法(可选 gzip/lz4/snappy)
      # 事务支持(需配合 @Transactional)
      transaction-id-prefix: tx-  # 开启事务需配置前缀
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 📥 三、消费者配置
spring:
  kafka:
    consumer:
      # 反序列化器(必填)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消费者组(必填)
      group-id: my-consumer-group
      # 偏移量重置策略(无偏移时)
      auto-offset-reset: earliest  # 可选:earliest(最早)、latest(最新)、none(报错)
      # 偏移量提交
      enable-auto-commit: false  # 是否自动提交偏移量(建议手动提交)
      auto-commit-interval: 1000ms  # 自动提交间隔(默认 5s)
      # 消息拉取控制
      max-poll-records: 500  # 单次 poll() 最大消息数(默认 500)
      fetch-max-wait: 500ms  # 拉取消息最长等待时间
      fetch-min-size: 1  # 最小拉取数据量(字节)
      # 会话与心跳
      session-timeout: 30000ms  # 消费者会话超时时间(默认 10s)
      heartbeat-interval: 3000ms  # 心跳间隔(需小于 session-timeout)
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 🔍 四、监听器配置
spring:
  kafka:
    listener:
      # 消费模式
      type: batch  # 可选:single(单条)、batch(批量)
      # 并发控制
      concurrency: 3  # 每个监听器的并发线程数(需 ≤ 分区数)
      # 偏移量提交方式
      ack-mode: manual_immediate  # 手动提交(可选:RECORD、BATCH、TIME等)
      # 错误处理
      missing-topics-fatal: false  # Topic 不存在时是否报错(默认 true)
 1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 🔒 五、安全认证扩展
spring:
  kafka:
    properties:
      # SSL 加密
      security.protocol: SSL
      ssl.truststore-location: classpath:/kafka.client.truststore.jks
      ssl.truststore-password: changeit
      # SASL 认证
      security.protocol: SASL_SSL
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
 1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# ⚠️ 六、重要注意事项
序列化兼容性
- 若传输 JSON 数据,使用 
JsonSerializer/JsonDeserializer,并配置信任包:spring.kafka.consumer.properties.spring.json.trusted.packages: com.example.model1 
- 若传输 JSON 数据,使用 
 批量消费优化
- 启用 
batch监听器时,消费者方法参数需改为List<ConsumerRecord>:@KafkaListener(topics = "my-topic") public void batchListen(List<ConsumerRecord<?, ?>> records) { // 批量处理逻辑 }1
2
3
4 
- 启用 
 生产环境建议
- 关闭自动提交(
enable-auto-commit: false),通过Acknowledgment手动提交偏移量 - 配置 
retries和delivery.timeout.ms防止消息丢失 - 监控 
max.poll.interval.ms避免消费者被踢出组 
- 关闭自动提交(
 
# 💎 总结
以上配置覆盖了 Kafka 在 Spring Boot 中的核心场景,实际使用时可根据需求删减。完整配置模板可参考 Spring Kafka 官方文档 (opens new window)。
编辑  (opens new window)
  上次更新: 2025/06/24, 00:41:57