在 Spring Kafka 中发送消息
  在 Spring Kafka 中发送消息后处理回调时,无需依赖 Guava,可直接使用 Spring 或 Kafka 原生接口。以下是修复后的代码方案,解决 ListenableFutureCallback 实现不全的问题,并提供两种替代方案(均避免使用 Guava):
# ✅ 方案一:使用 Spring 的 ListenableFutureCallback(修正泛型)
 问题根源:匿名内部类未明确泛型类型,导致编译器无法识别 onSuccess 方法参数类型。
 修复代码:
specInfoList.forEach(specInfo -> {
    ListenableFuture<SendResult<String, BaseProductSpecInfo>> future = 
        kafkaTemplate.send("product-spec-topic", specInfo);
    future.addCallback(new ListenableFutureCallback<SendResult<String, BaseProductSpecInfo>>() {
        @Override
        public void onSuccess(SendResult<String, BaseProductSpecInfo> result) {
            RecordMetadata metadata = result.getRecordMetadata();
            logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
                specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
        }
        @Override
        public void onFailure(Throwable ex) {
            logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}", 
                specInfo.getId(), ex.getMessage(), ex);
        }
    });
});
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
关键修改:
在 new ListenableFutureCallback<>() 中显式声明泛型类型 →
 new ListenableFutureCallback<SendResult<String, BaseProductSpecInfo>>()
# ✅ 方案二:使用 Kafka 原生 Callback 接口(更轻量)
 若无需 Spring 的 ListenableFuture 特性,可直接用 Kafka 的 Callback:
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
specInfoList.forEach(specInfo -> {
    kafkaTemplate.send("product-spec-topic", specInfo)
        .addCallback(new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception ex) {
                if (ex == null) {
                    logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
                        specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}", 
                        specInfo.getId(), ex.getMessage(), ex);
                }
            }
        });
});
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
优势:
- 直接使用 Kafka 客户端原生接口,减少依赖;
 - 避免泛型声明问题,代码更简洁。
 
# ✅ 方案三:Lambda 表达式(Java 8+ 推荐)
进一步简化代码,利用 Lambda 实现回调:
specInfoList.forEach(specInfo -> {
    kafkaTemplate.send("product-spec-topic", specInfo)
        .addCallback(
            result -> {
                RecordMetadata metadata = result.getRecordMetadata();
                logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
                    specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
            },
            ex -> logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}", 
                specInfo.getId(), ex.getMessage(), ex)
        );
});
 1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 💡 总结建议
| 方案 | 适用场景 | 复杂度 | 
|---|---|---|
| 显式泛型声明 | 需保留 ListenableFuture 特性 |  中 | 
Kafka 原生 Callback |  追求轻量,避免泛型问题 | 低 | 
| Lambda 表达式 | Java 8+,代码简洁优先 | 最低 | 
推荐选择:
- 若项目已用 Spring 的异步机制 → 选 方案一(修正泛型);
 - 若需最小化依赖 → 选 方案二(Kafka 
Callback); - 若代码简洁性优先 → 选 方案三(Lambda)。
 
编辑  (opens new window)
  上次更新: 2025/06/24, 00:41:57