send消息时,key有什么用?
在 Kafka 中,消息的 key
主要用于 控制消息的分区路由、保证消息顺序性,并间接支持消息过滤、日志压缩等功能。以下是核心作用详解及代码示例:
# 🔑 一、Key 的核心作用
# 1. 决定消息的分区路由
- 机制:Kafka 根据
key
的哈希值(默认使用 MurmurHash2 算法)计算分区号,公式为:
partition = hash(key) % 分区数量
。 - 效果:
- ✅ 相同
key
的消息总是进入同一分区。 - ⚠️ 无
key
时:消息以轮询或随机方式分配分区(取决于 Kafka 版本)。
- ✅ 相同
- 源码逻辑(Kafka 默认分区策略):
// DefaultPartitioner 源码逻辑 public int partition(String topic, Object key, byte[] keyBytes, ...) { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); // 无 key 时随机分配 } return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); // 有 key 时哈希取模 }
1
2
3
4
5
6
7
# 2. 保证消息顺序性
- 原理:Kafka 仅保证单个分区内的消息顺序(生产者发送顺序 = 消费者接收顺序)。
- 关键操作:
将需要顺序处理的消息(如同一订单、同一用户)设置相同key
→ 确保它们进入同一分区 → 实现局部有序。 - 适用场景:
- 订单状态变更(创建 → 支付 → 发货)。
- 用户行为序列(登录 → 操作 → 登出)。
# 3. 支持消息过滤与分组
- 消费者端:可通过
key
过滤或聚合相关消息(如只处理特定用户 ID 的消息)。 - 日志压缩(Log Compaction):
Kafka 会保留每个key
的最新消息,适用于维护状态(如用户配置实时更新)。
# 💻 二、代码示例
# 场景:发送用户行为事件,需保证同一用户的事件顺序处理。
# 1. Java 原生客户端(无 Spring)
import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息:用户ID作为 key
String topic = "user-events";
String userId = "user-123";
String event = "click_button";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, userId, event);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.printf(
"✅ 发送成功 | Key=%s | 分区=%d | Offset=%d%n",
userId, metadata.partition(), metadata.offset()
);
}
});
producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 2. Spring Kafka 示例
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import java.util.concurrent.CompletableFuture;
@Service
public class EventProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendUserEvent(String userId, String event) {
// 指定 key = userId
CompletableFuture<SendResult<String, String>> future =
kafkaTemplate.send("user-events", userId, event);
future.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("❌ 发送失败: " + ex.getMessage());
} else {
RecordMetadata metadata = result.getRecordMetadata();
System.out.printf(
"✅ 发送成功 | Key=%s | 分区=%d | Offset=%d%n",
userId, metadata.partition(), metadata.offset()
);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 3. 消费者过滤消息(基于 Key)
@KafkaListener(topics = "user-events", groupId = "user-group")
public void listen(ConsumerRecord<String, String> record) {
String userId = record.key();
String event = record.value();
// 只处理 user-123 的事件
if ("user-123".equals(userId)) {
System.out.println("处理事件: " + event);
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# ⚖️ 三、何时使用 Key?
场景 | 是否用 Key | 原因 |
---|---|---|
需要消息顺序性(如订单流) | ✅ 必须 | 相同 Key 确保消息进入同一分区,保证顺序。 |
需按业务维度分组(如用户ID) | ✅ 推荐 | 相同 Key 的消息集中在同一分区,便于聚合处理。 |
日志压缩(维护最新状态) | ✅ 必须 | 依赖 Key 标识唯一实体,保留最新值。 |
高吞吐量,无需顺序或分组 | ❌ 避免 | 无 Key 时消息均匀分布到所有分区,提升并行度。 |
# 💎 总结
- 核心作用:Key 是 Kafka 分区路由的控制器,直接影响消息存储位置和顺序性。
- 最佳实践:
- 顺序场景 → 用业务 ID(如订单号、用户ID)作为 Key。
- 高吞吐场景 → 避免 Key 以均匀分布消息。
- 状态维护 → 启用日志压缩并设置 Key。
通过合理使用 Key,可精准控制消息的物理分布和业务逻辑,是 Kafka 高效可靠的核心设计之一。
编辑 (opens new window)
上次更新: 2025/06/24, 00:41:57