Plantre Plantre
首页
后端
技术
硬件
  • 前端文章

    • HTML
    • CSS
    • JavaScript
  • 技术

    • 技术文档
    • GitHub技巧
    • Nodejs
    • 博客搭建
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

plantre

一个后端开发者
首页
后端
技术
硬件
  • 前端文章

    • HTML
    • CSS
    • JavaScript
  • 技术

    • 技术文档
    • GitHub技巧
    • Nodejs
    • 博客搭建
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 计算机组成原理

  • 操作系统

  • 计算机网络

  • 设计模式

  • Java

  • Spring

  • SpringCloud

  • MySQL

  • Redis

  • 分布式

  • Zookeeper

  • Dubbo

  • Kafka

    • Kafka的架构
    • docker搭建Kafka集群
    • spring关于kafka的最全配置
    • kafka分主副节点吗
    • kafka的 topic, key , 分区的关系,以及数量对应关系,以及默认值
    • spring项目是事先创建topic还是代码里创建
    • 一个topic一般创建几个分区,几个副本数,只有三个broker的话又是怎么分布的
    • 一共有3个broker,一个topic有10个分区,副本数最大为3?分区又是怎么分布在3个broker上的?用的什么算法
    • 副本数和分区数什么区别
    • 我还是不明白副本数,请画出具体的分布图,3个broker,某个topic10个分区
    • Leader+​​Follower = 副本数?
    • ​​ISR(In-Sync Replicas)是什么,最多有多少个,这个数量怎么得来的
    • 在 Spring Kafka 中发送消息
    • 向kafka中发送重复的数据会怎么样
    • send消息时,key有什么用?
    • 数据结构

    • 算法

    • OJ

    • 从道家哲学看计算机?
    • 后端
    • Kafka
    plantre
    2025-06-23
    目录

    send消息时,key有什么用?

    在 Kafka 中,消息的 key 主要用于 控制消息的分区路由、保证消息顺序性,并间接支持消息过滤、日志压缩等功能。以下是核心作用详解及代码示例:


    # 🔑 一、Key 的核心作用

    # 1. 决定消息的分区路由

    • 机制:Kafka 根据 key 的哈希值(默认使用 MurmurHash2 算法)计算分区号,公式为:
      partition = hash(key) % 分区数量。
    • 效果:
      • ✅ 相同 key 的消息总是进入同一分区。
      • ⚠️ 无 key 时:消息以轮询或随机方式分配分区(取决于 Kafka 版本)。
    • 源码逻辑(Kafka 默认分区策略):
      // DefaultPartitioner 源码逻辑
      public int partition(String topic, Object key, byte[] keyBytes, ...) {
          if (keyBytes == null) {
              return stickyPartitionCache.partition(topic, cluster); // 无 key 时随机分配
          }
          return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); // 有 key 时哈希取模
      }
      
      1
      2
      3
      4
      5
      6
      7

    # 2. 保证消息顺序性

    • 原理:Kafka 仅保证单个分区内的消息顺序(生产者发送顺序 = 消费者接收顺序)。
    • 关键操作:
      将需要顺序处理的消息(如同一订单、同一用户)设置相同 key → 确保它们进入同一分区 → 实现局部有序。
    • 适用场景:
      • 订单状态变更(创建 → 支付 → 发货)。
      • 用户行为序列(登录 → 操作 → 登出)。

    # 3. 支持消息过滤与分组

    • 消费者端:可通过 key 过滤或聚合相关消息(如只处理特定用户 ID 的消息)。
    • 日志压缩(Log Compaction):
      Kafka 会保留每个 key 的最新消息,适用于维护状态(如用户配置实时更新)。

    # 💻 二、代码示例

    # 场景:发送用户行为事件,需保证同一用户的事件顺序处理。

    # 1. Java 原生客户端(无 Spring)

    import org.apache.kafka.clients.producer.*;
    
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    Producer<String, String> producer = new KafkaProducer<>(props);
    
    // 发送消息:用户ID作为 key
    String topic = "user-events";
    String userId = "user-123";
    String event = "click_button";
    
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, userId, event);
    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            System.err.println("发送失败: " + exception.getMessage());
        } else {
            System.out.printf(
                "✅ 发送成功 | Key=%s | 分区=%d | Offset=%d%n",
                userId, metadata.partition(), metadata.offset()
            );
        }
    });
    producer.close();
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

    # 2. Spring Kafka 示例

    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import java.util.concurrent.CompletableFuture;
    
    @Service
    public class EventProducer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendUserEvent(String userId, String event) {
            // 指定 key = userId
            CompletableFuture<SendResult<String, String>> future = 
                kafkaTemplate.send("user-events", userId, event);
            
            future.whenComplete((result, ex) -> {
                if (ex != null) {
                    System.err.println("❌ 发送失败: " + ex.getMessage());
                } else {
                    RecordMetadata metadata = result.getRecordMetadata();
                    System.out.printf(
                        "✅ 发送成功 | Key=%s | 分区=%d | Offset=%d%n",
                        userId, metadata.partition(), metadata.offset()
                    );
                }
            });
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

    # 3. 消费者过滤消息(基于 Key)

    @KafkaListener(topics = "user-events", groupId = "user-group")
    public void listen(ConsumerRecord<String, String> record) {
        String userId = record.key();
        String event = record.value();
        
        // 只处理 user-123 的事件
        if ("user-123".equals(userId)) {
            System.out.println("处理事件: " + event);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    # ⚖️ 三、何时使用 Key?

    场景 是否用 Key 原因
    需要消息顺序性(如订单流) ✅ 必须 相同 Key 确保消息进入同一分区,保证顺序。
    需按业务维度分组(如用户ID) ✅ 推荐 相同 Key 的消息集中在同一分区,便于聚合处理。
    日志压缩(维护最新状态) ✅ 必须 依赖 Key 标识唯一实体,保留最新值。
    高吞吐量,无需顺序或分组 ❌ 避免 无 Key 时消息均匀分布到所有分区,提升并行度。

    # 💎 总结

    • 核心作用:Key 是 Kafka 分区路由的控制器,直接影响消息存储位置和顺序性。
    • 最佳实践:
      • 顺序场景 → 用业务 ID(如订单号、用户ID)作为 Key。
      • 高吞吐场景 → 避免 Key 以均匀分布消息。
      • 状态维护 → 启用日志压缩并设置 Key。

    通过合理使用 Key,可精准控制消息的物理分布和业务逻辑,是 Kafka 高效可靠的核心设计之一。

    编辑 (opens new window)
    上次更新: 2025/06/24, 00:41:57
    向kafka中发送重复的数据会怎么样
    查找

    ← 向kafka中发送重复的数据会怎么样 查找→

    最近更新
    01
    集成loki
    07-04
    02
    TCP的ESTABLISHED是什么意思
    06-24
    03
    安装1panel
    06-24
    更多文章>
    Theme by Vdoing | Copyright © 2025-2025 plantre | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式