spring关于kafka的最全配置
以下是基于 Spring Boot 官方文档及多篇实践总结的 application.yml
中 Kafka 的最全配置详解,涵盖生产者、消费者、监听器、安全认证等场景,按模块分类整理:
# ⚙️ 一、公共基础配置
spring:
kafka:
# Kafka 集群地址(必填,多节点逗号分隔)
bootstrap-servers: kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
# 全局属性扩展(支持所有 Kafka 原生参数)
properties:
security.protocol: SASL_SSL # 安全协议(如 SASL_SSL)
sasl.mechanism: SCRAM-SHA-256 # 认证机制
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="secret"; # JAAS 配置
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 📤 二、生产者配置
spring:
kafka:
producer:
# 序列化器(必填)
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息确认机制(可靠性)
acks: all # 可选值:0(无需确认)、1(Leader 确认)、all(ISR 全部确认)
retries: 3 # 发送失败重试次数
# 性能优化
batch-size: 16384 # 批量发送大小(字节),默认 16KB
linger-ms: 10 # 消息等待时间(毫秒),积攒更多消息批量发送
buffer-memory: 33554432 # 生产者缓冲区大小(字节),默认 32MB
compression-type: gzip # 压缩算法(可选 gzip/lz4/snappy)
# 事务支持(需配合 @Transactional)
transaction-id-prefix: tx- # 开启事务需配置前缀
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 📥 三、消费者配置
spring:
kafka:
consumer:
# 反序列化器(必填)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者组(必填)
group-id: my-consumer-group
# 偏移量重置策略(无偏移时)
auto-offset-reset: earliest # 可选:earliest(最早)、latest(最新)、none(报错)
# 偏移量提交
enable-auto-commit: false # 是否自动提交偏移量(建议手动提交)
auto-commit-interval: 1000ms # 自动提交间隔(默认 5s)
# 消息拉取控制
max-poll-records: 500 # 单次 poll() 最大消息数(默认 500)
fetch-max-wait: 500ms # 拉取消息最长等待时间
fetch-min-size: 1 # 最小拉取数据量(字节)
# 会话与心跳
session-timeout: 30000ms # 消费者会话超时时间(默认 10s)
heartbeat-interval: 3000ms # 心跳间隔(需小于 session-timeout)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 🔍 四、监听器配置
spring:
kafka:
listener:
# 消费模式
type: batch # 可选:single(单条)、batch(批量)
# 并发控制
concurrency: 3 # 每个监听器的并发线程数(需 ≤ 分区数)
# 偏移量提交方式
ack-mode: manual_immediate # 手动提交(可选:RECORD、BATCH、TIME等)
# 错误处理
missing-topics-fatal: false # Topic 不存在时是否报错(默认 true)
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 🔒 五、安全认证扩展
spring:
kafka:
properties:
# SSL 加密
security.protocol: SSL
ssl.truststore-location: classpath:/kafka.client.truststore.jks
ssl.truststore-password: changeit
# SASL 认证
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# ⚠️ 六、重要注意事项
序列化兼容性
- 若传输 JSON 数据,使用
JsonSerializer
/JsonDeserializer
,并配置信任包:spring.kafka.consumer.properties.spring.json.trusted.packages: com.example.model
1
- 若传输 JSON 数据,使用
批量消费优化
- 启用
batch
监听器时,消费者方法参数需改为List<ConsumerRecord>
:@KafkaListener(topics = "my-topic") public void batchListen(List<ConsumerRecord<?, ?>> records) { // 批量处理逻辑 }
1
2
3
4
- 启用
生产环境建议
- 关闭自动提交(
enable-auto-commit: false
),通过Acknowledgment
手动提交偏移量 - 配置
retries
和delivery.timeout.ms
防止消息丢失 - 监控
max.poll.interval.ms
避免消费者被踢出组
- 关闭自动提交(
# 💎 总结
以上配置覆盖了 Kafka 在 Spring Boot 中的核心场景,实际使用时可根据需求删减。完整配置模板可参考 Spring Kafka 官方文档 (opens new window)。
编辑 (opens new window)
上次更新: 2025/06/24, 00:41:57