当前位置 博文首页 > 文章内容

    4.kafka原理

    作者: 栏目:未分类 时间:2020-07-25 16:00:50

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    kafka原理

    4.1.消息存储和查询机制

    4.1.1.消息存储机制

    topic
    #关于类别topic:
        1.每条发布到kafka集群的消息都有一个类别,该类别就是topic
    partition
    #关于分区partition:
        1.partition是物理上的概念,每个topic包含一个或者多个partition
        2.每个分区由一系列有序的不可变的消息组成,是一个有序队列
        3.每个分区在物理上是一个文件夹,分区命名规则:${topicName}-${partitionId}。比如:itheima_topic-0
        4.分区目录下,存储该分区的日志段。包含一个数据文件和两个索引文件
        5.每条消息被追加到对应的分区中,是顺序写磁盘。这也是kafka高吞吐量的重要保证
        6.kafka是局部有序,即只保证一个分区内的消息有序性,不保证全局有序

    logSegment
    #关于日志段logSegment:
        1.日志文件按照大小、或者时间滚动,切分成一个或者多个日志段(logSegment)
            日志段大小默认1GB,配置参数:log.segment.bytes
            时间长度配置参数:log.roll.ms、或者log.roll.hours
        2.kafka的日志段:
            由一个日志文件:
                00000000000000000000.log
            两个索引文件:
                00000000000000000000.index
                00000000000000000000.timeindex
        3.数据文件:
            数据文件以.log为后缀,保存实际消息数据
            命名规则:数据文件的第一条消息偏移量(基准偏移量:BaseOffset),左补0构成20位数字字符组成
            基准偏移量是上一个数据文件的LEO+1。LEO(Log End Offset)
            
        4.偏移量索引文件:
            文件名称与数据文件名称相同,以.index作为后缀。用于快速根据偏移量定位到消息所在的位置
            
        5.时间戳索引文件:
            文件名称与数据文件名称相同,以.timeindex作为后缀。用于根据时间戳快速定位到消息所在的位置

    关于日志段文件:

     

     

     关于偏移量:

    4.1.2.消息查询机制

    #读取offset=368776的消息,需要通过两个步骤完成:
        查找segment file
        通过segment file 查找message

    查找segment file
    #文件偏移量:
        1. 最开始的文件:00000000000000000000.index,起始偏移量(offset)为 0
        2.00000000000000368769.index,消息起始偏移量:368770 = 368769 + 1
        3.00000000000000737337.index,消息起始偏移量:737338=737337 + 1
        4.后续文件以此类推
        
    #查找过程:
        1.根据起始偏移量,文件有序。
        2.通过二分查找,快速定位到当前offset对应的文件。比如:
            当 offset=368776 ,定位到 00000000000000368769.index 和对应 log 文件
    通过segment file 查找message
    #偏移量在.index文件存储:
        kafka并不是每条消息都对应有索引(在.index进行存储)。而是采取了稀疏存储的方式,每个一定字节的数据建立一条索引。索引跨度通过参数配置:index.interval.bytes
    
    #查找过程:
        1.根据当前目标偏移量,通过二分查找,查找值小于等于目标偏移量的最大偏移量
        2.从查找到的最大偏移量开始,顺序扫描数据文件,直到在数据文件中找到偏移量,与目标偏移量相等的消息

    4.2.生产者数据分发策略与发送数据方式

    4.3.1.数据分发策略

    #关于生产者数据分发策略:
        1.在kafka中一个topic下,有一个或者多个partition。那么当Producer向kafka写入数据的时候,如何决定数据该写入哪一个partition中呢?
        2.分三种情况:
            2.1.Producer将数据发送到指定分区:
                
                /**
                *创建消息对象 0 1 2
                *参数说明:
                *    topic:指定类别
                *    partition:指定分区
                *    key:数据的key
                *    value:数据值value
                */
                 public ProducerRecord(String topic, Integer partition, K key, V value) {
                    this(topic, partition, (Long)null, key, value, (Iterable)null);
                  }
                  
            2.2.如果没有明确指定分区:
                2.2.1.数据不是k/v对数据,采取轮询的策略。比如有三个分区:0,1,2。则将数据对应按照0,1,2,0,1,2......轮询方式写入分区
                2.2.2.数据是k/v对数据,采取计算k的哈希值,将k哈希值%可用分区数=当前分区。决定将数据写入到对应的分区

    源码参考:

    // 默认分区类:
    public class DefaultPartitioner implements Partitioner {
        
        // 获取目标分区
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 获取当前topic所有的分区
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            // 获取到总的分区数量
            int numPartitions = partitions.size();
            // 判断当前消息是否有key,如果没有key的话,采取的是轮训策略
            if (keyBytes == null) {
                int nextValue = this.nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return ((PartitionInfo)availablePartitions.get(part)).partition();
                } else {
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                // 如果当前的消息是有key的,把当前的key求hash值,再跟总的分区数量求余,决定把数据写入哪一个分区
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        // 轮询策略,获取下一个分区
        private int nextValue(String topic) {
            AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
            if (null == counter) {
                counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
                AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
    
            return counter.getAndIncrement();
        }
            
    }

    4.3.2.发送数据方式

    #关于数据发送方式:
    1.同步阻塞发送:
    适用场景:
    业务不需要高吞吐量、更关心消息发送的顺序、不允许消息发送失败
    参考代码:
    图一

    2.异步发送(发送并忘记):
    适用场景:
    业务只关心吞吐量、不关心消息发送的顺序、可以允许消息发送失败
    参考代码:
    图二

    3.异步发送(回调函数):
    适用场景:
    业务需要知道消息发送成功、不关心消息发送的顺序
    参考代码:
    图三

    4.3.消费者负载均衡策略

    #关于消费者平衡过程:
        1.消费者平衡(Consumer rebalance)指消费者重新加入消费组,并重新分配分区给消费者的过程
        2.会引起消费者平衡的情况:
            2.1.新的消费者加入消费组
            2.2.某个消费者从消费组退出(不管是异常退出,还是正常关闭)
            2.3.增加订阅主题的分区(kafka的分区数,可以动态增加,但不能减少)
            2.4.某台broker宕机,新的协调器当选
            2.5.某个消费者在心跳会话时间内没有发送心跳请求(配置参数:session.timeout.ms),组协调器认为消费者已经退出
    
    #关于消费者与partition对应关系:
        1.如果有三个partition:p0/p1/p2,同一个消费组有三个消费者:c0/c1/c2。则为一一对应关系
        2.如果有三个partition:p0/p1/p2,同一个消费组有两个消费者:c0/c1。则其中一个消费者消费两个分区的数据;另外一个消费者消费一个分区的数据
        3.如果有两个partition:p0/p1,同一个消费组有三个消费者:c0/c1/c2。则有一个消费者为空闲,另外两个消费者分别各自消费一个分区的数据
        
        --总的来说,同一个分区的数据,只能被一个消费组中的一个消费者消费