在 Spring Kafka 中发送消息
在 Spring Kafka 中发送消息后处理回调时,无需依赖 Guava,可直接使用 Spring 或 Kafka 原生接口。以下是修复后的代码方案,解决 ListenableFutureCallback
实现不全的问题,并提供两种替代方案(均避免使用 Guava):
# ✅ 方案一:使用 Spring 的 ListenableFutureCallback
(修正泛型)
问题根源:匿名内部类未明确泛型类型,导致编译器无法识别 onSuccess
方法参数类型。
修复代码:
specInfoList.forEach(specInfo -> {
ListenableFuture<SendResult<String, BaseProductSpecInfo>> future =
kafkaTemplate.send("product-spec-topic", specInfo);
future.addCallback(new ListenableFutureCallback<SendResult<String, BaseProductSpecInfo>>() {
@Override
public void onSuccess(SendResult<String, BaseProductSpecInfo> result) {
RecordMetadata metadata = result.getRecordMetadata();
logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}",
specInfo.getId(), ex.getMessage(), ex);
}
});
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
关键修改:
在 new ListenableFutureCallback<>()
中显式声明泛型类型 →
new ListenableFutureCallback<SendResult<String, BaseProductSpecInfo>>()
# ✅ 方案二:使用 Kafka 原生 Callback
接口(更轻量)
若无需 Spring 的 ListenableFuture
特性,可直接用 Kafka 的 Callback
:
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
specInfoList.forEach(specInfo -> {
kafkaTemplate.send("product-spec-topic", specInfo)
.addCallback(new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
if (ex == null) {
logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
} else {
logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}",
specInfo.getId(), ex.getMessage(), ex);
}
}
});
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
优势:
- 直接使用 Kafka 客户端原生接口,减少依赖;
- 避免泛型声明问题,代码更简洁。
# ✅ 方案三:Lambda 表达式(Java 8+ 推荐)
进一步简化代码,利用 Lambda 实现回调:
specInfoList.forEach(specInfo -> {
kafkaTemplate.send("product-spec-topic", specInfo)
.addCallback(
result -> {
RecordMetadata metadata = result.getRecordMetadata();
logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
},
ex -> logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}",
specInfo.getId(), ex.getMessage(), ex)
);
});
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 💡 总结建议
方案 | 适用场景 | 复杂度 |
---|---|---|
显式泛型声明 | 需保留 ListenableFuture 特性 | 中 |
Kafka 原生 Callback | 追求轻量,避免泛型问题 | 低 |
Lambda 表达式 | Java 8+,代码简洁优先 | 最低 |
推荐选择:
- 若项目已用 Spring 的异步机制 → 选 方案一(修正泛型);
- 若需最小化依赖 → 选 方案二(Kafka
Callback
); - 若代码简洁性优先 → 选 方案三(Lambda)。
编辑 (opens new window)
上次更新: 2025/06/24, 00:41:57