Plantre Plantre
首页
后端
技术
硬件
  • 前端文章

    • HTML
    • CSS
    • JavaScript
  • 技术

    • 技术文档
    • GitHub技巧
    • Nodejs
    • 博客搭建
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

plantre

一个后端开发者
首页
后端
技术
硬件
  • 前端文章

    • HTML
    • CSS
    • JavaScript
  • 技术

    • 技术文档
    • GitHub技巧
    • Nodejs
    • 博客搭建
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 计算机组成原理

  • 操作系统

  • 计算机网络

  • 设计模式

  • Java

  • Spring

  • SpringCloud

  • MySQL

  • Redis

  • 分布式

  • Zookeeper

  • Dubbo

  • Kafka

    • Kafka的架构
    • docker搭建Kafka集群
    • spring关于kafka的最全配置
    • kafka分主副节点吗
    • kafka的 topic, key , 分区的关系,以及数量对应关系,以及默认值
    • spring项目是事先创建topic还是代码里创建
    • 一个topic一般创建几个分区,几个副本数,只有三个broker的话又是怎么分布的
    • 一共有3个broker,一个topic有10个分区,副本数最大为3?分区又是怎么分布在3个broker上的?用的什么算法
    • 副本数和分区数什么区别
    • 我还是不明白副本数,请画出具体的分布图,3个broker,某个topic10个分区
    • Leader+​​Follower = 副本数?
    • ​​ISR(In-Sync Replicas)是什么,最多有多少个,这个数量怎么得来的
    • 在 Spring Kafka 中发送消息
      • 向kafka中发送重复的数据会怎么样
      • send消息时,key有什么用?
    • 数据结构

    • 算法

    • OJ

    • 从道家哲学看计算机?
    • 后端
    • Kafka
    plantre
    2025-06-23
    目录

    在 Spring Kafka 中发送消息

    在 Spring Kafka 中发送消息后处理回调时,无需依赖 Guava,可直接使用 Spring 或 Kafka 原生接口。以下是修复后的代码方案,解决 ListenableFutureCallback 实现不全的问题,并提供两种替代方案(均避免使用 Guava):


    # ✅ 方案一:使用 Spring 的 ListenableFutureCallback(修正泛型)

    问题根源:匿名内部类未明确泛型类型,导致编译器无法识别 onSuccess 方法参数类型。
    修复代码:

    specInfoList.forEach(specInfo -> {
        ListenableFuture<SendResult<String, BaseProductSpecInfo>> future = 
            kafkaTemplate.send("product-spec-topic", specInfo);
    
        future.addCallback(new ListenableFutureCallback<SendResult<String, BaseProductSpecInfo>>() {
            @Override
            public void onSuccess(SendResult<String, BaseProductSpecInfo> result) {
                RecordMetadata metadata = result.getRecordMetadata();
                logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
                    specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
            }
    
            @Override
            public void onFailure(Throwable ex) {
                logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}", 
                    specInfo.getId(), ex.getMessage(), ex);
            }
        });
    });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19

    关键修改:
    在 new ListenableFutureCallback<>() 中显式声明泛型类型 →
    new ListenableFutureCallback<SendResult<String, BaseProductSpecInfo>>()


    # ✅ 方案二:使用 Kafka 原生 Callback 接口(更轻量)

    若无需 Spring 的 ListenableFuture 特性,可直接用 Kafka 的 Callback:

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    specInfoList.forEach(specInfo -> {
        kafkaTemplate.send("product-spec-topic", specInfo)
            .addCallback(new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception ex) {
                    if (ex == null) {
                        logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
                            specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}", 
                            specInfo.getId(), ex.getMessage(), ex);
                    }
                }
            });
    });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    优势:

    • 直接使用 Kafka 客户端原生接口,减少依赖;
    • 避免泛型声明问题,代码更简洁。

    # ✅ 方案三:Lambda 表达式(Java 8+ 推荐)

    进一步简化代码,利用 Lambda 实现回调:

    specInfoList.forEach(specInfo -> {
        kafkaTemplate.send("product-spec-topic", specInfo)
            .addCallback(
                result -> {
                    RecordMetadata metadata = result.getRecordMetadata();
                    logger.info("✅ 发送成功 | 对象ID: {} | Topic: {} | 分区: {} | Offset: {}",
                        specInfo.getId(), metadata.topic(), metadata.partition(), metadata.offset());
                },
                ex -> logger.error("❌ 发送失败 | 对象ID: {} | 错误: {}", 
                    specInfo.getId(), ex.getMessage(), ex)
            );
    });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    # 💡 总结建议

    方案 适用场景 复杂度
    显式泛型声明 需保留 ListenableFuture 特性 中
    Kafka 原生 Callback 追求轻量,避免泛型问题 低
    Lambda 表达式 Java 8+,代码简洁优先 最低

    推荐选择:

    • 若项目已用 Spring 的异步机制 → 选 方案一(修正泛型);
    • 若需最小化依赖 → 选 方案二(Kafka Callback);
    • 若代码简洁性优先 → 选 方案三(Lambda)。
    编辑 (opens new window)
    上次更新: 2025/06/24, 00:41:57
    ​​ISR(In-Sync Replicas)是什么,最多有多少个,这个数量怎么得来的
    向kafka中发送重复的数据会怎么样

    ← ​​ISR(In-Sync Replicas)是什么,最多有多少个,这个数量怎么得来的 向kafka中发送重复的数据会怎么样→

    最近更新
    01
    集成loki
    07-04
    02
    TCP的ESTABLISHED是什么意思
    06-24
    03
    安装1panel
    06-24
    更多文章>
    Theme by Vdoing | Copyright © 2025-2025 plantre | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式