当前位置 博文首页 > 文章内容

    kafka学习笔记(七)kafka的状态机模块

    作者: 栏目:未分类 时间:2020-09-19 17:00:32

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    概述

     这一篇随笔介绍kafka的状态机模块,Kafka 源码中有很多状态机和管理器,比如之前我们学过的 Controller 通道管理器 ControllerChannelManager、处理 Controller 事件的 ControllerEventManager,等等。这些管理器和状态机,大多与各自的“宿主”组件关系密切,可以说是大小不同、功能各异。就比如 Controller 的这两个管理器,必须要与 Controller 组件紧耦合在一起才能实现各自的功能。不过,Kafka 中还是有一些状态机和管理器具有相对独立的功能框架,不严重依赖使用方,也就是我在这个模块为你精选的 TopicDeletionManager(主题删除管理器)、ReplicaStateMachine(副本状态机)和 PartitionStateMachine(分区状态机)。TopicDeletionManager:负责对指定 Kafka 主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。

    ReplicaStateMachine:负责定义 Kafka 副本状态、合法的状态转换,以及管理状态之间的转换。

    PartitionStateMachine:负责定义 Kafka 分区状态、合法的状态转换,以及管理状态之间的转换。

    TopicDeletionManager

    TopicDeletionManager.scala 这个源文件,包括 3 个部分。

    DeletionClient 接口:负责实现删除主题以及后续的动作,比如更新元数据等。这个接口里定义了 4 个方法,分别是 deleteTopic、deleteTopicDeletions、mutePartitionModifications 和 sendMetadataUpdate。我们后面再详细学习它们的代码。

    ControllerDeletionClient 类:实现 DeletionClient 接口的类,分别实现了刚刚说到的那 4 个方法。

    TopicDeletionManager 类:主题删除管理器类,定义了若干个方法维护主题删除前后集群状态的正确性。比如,什么时候才能删除主题、什么时候主题不能被删除、主题删除过程中要规避哪些操作,等等。

    DeletionClient 接口及其实现

    DeletionClient 接口定义的方法用于删除主题,并将删除主题这件事儿同步给其他 Broker。目前,DeletionClient 这个接口只有一个实现类,即 ControllerDeletionClient。我们看下这个实现类的代码:

     1 class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient {
     2   // 删除给定主题
     3   override def deleteTopic(topic: String, epochZkVersion: Int): Unit = {
     4     // 删除/brokers/topics/<topic>节点
     5     zkClient.deleteTopicZNode(topic, epochZkVersion)
     6     // 删除/config/topics/<topic>节点
     7     zkClient.deleteTopicConfigs(Seq(topic), epochZkVersion)
     8     // 删除/admin/delete_topics/<topic>节点
     9     zkClient.deleteTopicDeletions(Seq(topic), epochZkVersion)
    10   }
    11   // 删除/admin/delete_topics下的给定topic子节点
    12   override def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit = {
    13     zkClient.deleteTopicDeletions(topics, epochZkVersion)
    14   }
    15   // 取消/brokers/topics/<topic>节点数据变更的监听
    16   override def mutePartitionModifications(topic: String): Unit = {
    17     controller.unregisterPartitionModificationsHandlers(Seq(topic))
    18   }
    19   // 向集群Broker发送指定分区的元数据更新请求
    20   override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
    21     controller.sendUpdateMetadataRequest(
    22       controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
    23   }
    24 }

    这个类的构造函数接收两个字段。同时,由于是 DeletionClient 接口的实现类,因而该类实现了 DeletionClient 接口定义的四个方法。先来说构造函数的两个字段:KafkaController 实例和 KafkaZkClient 实例。KafkaController 实例,我们已经很熟悉了,就是 Controller 组件对象;而 KafkaZkClient 实例,就是 Kafka 与 ZooKeeper 交互的客户端对象。接下来,我们再结合代码看下 DeletionClient 接口实现类 ControllerDeletionClient 定义的 4 个方法。我来简单介绍下这 4 个方法大致是做什么的。

    1.deleteTopic它用于删除主题在 ZooKeeper 上的所有“痕迹”。具体方法是,分别调用 KafkaZkClient 的 3 个方法去删除 ZooKeeper 下 /brokers/topics/节点、/config/topics/节点和 /admin/delete_topics/节点。2.deleteTopicDeletions它用于删除 ZooKeeper 下待删除主题的标记节点。具体方法是,调用 KafkaZkClient 的 deleteTopicDeletions 方法,批量删除一组主题在 /admin/delete_topics 下的子节点。注意,deleteTopicDeletions 这个方法名结尾的 Deletions,表示 /admin/delete_topics 下的子节点。所以,deleteTopic 是删除主题,deleteTopicDeletions 是删除 /admin/delete_topics 下的对应子节点。到这里,我们还要注意的一点是,这两个方法里都有一个 epochZkVersion 的字段,代表期望的 Controller Epoch 版本号。如果你使用一个旧的 Epoch 版本号执行这些方法,ZooKeeper 会拒绝,因为和它自己保存的版本号不匹配。如果一个 Controller 的 Epoch 值小于 ZooKeeper 中保存的,那么这个 Controller 很可能是已经过期的 Controller。这种 Controller 就被称为 Zombie Controller。epochZkVersion 字段的作用,就是隔离 Zombie Controller 发送的操作。

    3.mutePartitionModifications它的作用是屏蔽主题分区数据变更监听器,具体实现原理其实就是取消 /brokers/topics/节点数据变更的监听。这样当该主题的分区数据发生变更后,由于对应的 ZooKeeper 监听器已经被取消了,因此不会触发 Controller 相应的处理逻辑。那为什么要取消这个监听器呢?其实,主要是为了避免操作之间的相互干扰。设想下,用户 A 发起了主题删除,而同时用户 B 为这个主题新增了分区。此时,这两个操作就会相互冲突,如果允许 Controller 同时处理这两个操作,势必会造成逻辑上的混乱以及状态的不一致。为了应对这种情况,在移除主题副本和分区对象前,代码要先执行这个方法,以确保不再响应用户对该主题的其他操作。mutePartitionModifications 方法的实现原理很简单,它会调用 unregisterPartitionModificationsHandlers,并接着调用 KafkaZkClient 的 unregisterZNodeChangeHandler 方法,取消 ZooKeeper 上对给定主题的分区节点数据变更的监听。

    4.sendMetadataUpdate它会调用 KafkaController 的 sendUpdateMetadataRequest 方法,给集群所有 Broker 发送更新请求,告诉它们不要再为已删除主题的分区提供服务。

    TopicDeletionManager 定义及初始化

    有了这些铺垫,我们再来看主题删除管理器的主要入口:TopicDeletionManager 类。这个类的定义代码,如下:

     1 class TopicDeletionManager(
     2   // KafkaConfig类,保存Broker端参数
     3   config: KafkaConfig, 
     4   // 集群元数据
     5   controllerContext: ControllerContext,
     6   // 副本状态机,用于设置副本状态
     7   replicaStateMachine: ReplicaStateMachine,
     8   // 分区状态机,用于设置分区状态
     9   partitionStateMachine: PartitionStateMachine,
    10   // DeletionClient接口,实现主题删除
    11   client: DeletionClient) extends Logging {
    12   this.logIdent = s"[Topic Deletion Manager ${config.brokerId}] "
    13   // 是否允许删除主题
    14   val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
    15   ......
    16 }

    该类主要的属性有 6 个,我们分别来看看。

    config:KafkaConfig 实例,可以用作获取 Broker 端参数 delete.topic.enable 的值。该参数用于控制是否允许删除主题,默认值是 true,即 Kafka 默认允许用户删除主题。

    controllerContext:Controller 端保存的元数据信息。删除主题必然要变更集群元数据信息,因此 TopicDeletionManager 需要用到 controllerContext 的方法,去更新它保存的数据。

    replicaStateMachine 和 partitionStateMachine:副本状态机和分区状态机。它们各自负责副本和分区的状态转换,以保持副本对象和分区对象在集群上的一致性状态。这两个状态机是后面两讲的重要知识点。

    client:前面介绍的 DeletionClient 接口。TopicDeletionManager 通过该接口执行 ZooKeeper 上节点的相应更新。

    isDeleteTopicEnabled:表明主题是否允许被删除。它是 Broker 端参数 delete.topic.enable 的值,默认是 true,表示 Kafka 允许删除主题。源码中大量使用这个字段判断主题的可删除性。前面的 config 参数的主要目的就是设置这个字段的值。被设定之后,config 就不再被源码使用了。

    TopicDeletionManager 重要方法

    最重要的当属 resumeDeletions 方法。它是重启主题删除操作过程的方法。主题因为某些事件可能一时无法完成删除,比如主题分区正在进行副本重分配等。一旦这些事件完成后,主题重新具备可删除的资格。此时,代码就需要调用 resumeDeletions 重启删除操作。这个方法之所以很重要,是因为它还串联了 TopicDeletionManager 类的很多方法,如 completeDeleteTopic 和 onTopicDeletion 等。因此,你完全可以从 resumeDeletions 方法开始,逐渐深入到其他方法代码的学习。那我们就先学习 resumeDeletions 的实现代码吧。

     1 private def resumeDeletions(): Unit = {
     2   // 从元数据缓存中获取要删除的主题列表
     3   val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
     4   // 待重试主题列表
     5   val topicsEligibleForRetry = mutable.Set.empty[String]
     6   // 待删除主题列表
     7   val topicsEligibleForDeletion = mutable.Set.empty[String]
     8   if (topicsQueuedForDeletion.nonEmpty)
     9     info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
    10   // 遍历每个待删除主题
    11   topicsQueuedForDeletion.foreach { topic =>
    12     // 如果该主题所有副本已经是ReplicaDeletionSuccessful状态
    13     // 即该主题已经被删除  
    14     if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
    15       // 调用completeDeleteTopic方法完成后续操作即可
    16       completeDeleteTopic(topic)
    17       info(s"Deletion of topic $topic successfully completed")
    18      // 如果主题删除尚未开始并且主题当前无法执行删除的话
    19     } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
    20       if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
    21         // 把该主题加到待重试主题列表中用于后续重试
    22         topicsEligibleForRetry += topic
    23       }
    24     }
    25     // 如果该主题能够被删除
    26     if (isTopicEligibleForDeletion(topic)) {
    27       info(s"Deletion of topic $topic (re)started")
    28       topicsEligibleForDeletion += topic
    29     }
    30   }
    31   // 重试待重试主题列表中的主题删除操作
    32   if (topicsEligibleForRetry.nonEmpty) {
    33     retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
    34   }
    35   // 调用onTopicDeletion方法,对待删除主题列表中的主题执行删除操作
    36   if (topicsEligibleForDeletion.nonEmpty) {
    37     onTopicDeletion(topicsEligibleForDeletion)
    38   }
    39 }

    通过代码我们发现,这个方法首先从元数据缓存中获取要删除的主题列表,之后定义了两个空的主题列表,分别保存待重试删除主题和待删除主题。然后,代码遍历每个要删除的主题,去看它所有副本的状态。如果副本状态都是 ReplicaDeletionSuccessful,就表明该主题已经被成功删除,此时,再调用 completeDeleteTopic 方法,完成后续的操作就可以了。对于那些删除操作尚未开始,并且暂时无法执行删除的主题,源码会把这类主题加到待重试主题列表中,用于后续重试;如果主题是能够被删除的,就将其加入到待删除列表中。最后,该方法调用 retryDeletionForIneligibleReplicas 方法,来重试待重试主题列表中的主题删除操作。对于待删除主题列表中的主题则调用 onTopicDeletion 删除之。值得一提的是,retryDeletionForIneligibleReplicas 方法用于重试主题删除。这是通过将对应主题副本的状态,从 ReplicaDeletionIneligible 变更到 OfflineReplica 来完成的。这样,后续再次调用 resumeDeletions 时,会尝试重新删除主题。

    总结:在主题删除过程中,Kafka 会调整集群中三个地方的数据:ZooKeeper、元数据缓存和磁盘日志文件。删除主题时,ZooKeeper 上与该主题相关的所有 ZNode 节点必须被清除;Controller 端元数据缓存中的相关项,也必须要被处理,并且要被同步到集群的其他 Broker 上;而磁盘日志文件,更是要清理的首要目标。这三个地方必须要统一处理,就好似我们常说的原子性操作一样。

    ReplicaStateMachine

    我们看下 ReplicaStateMachine 及其子类 ZKReplicaStateMachine 在代码中是如何定义的,请看这两个代码片段:

     1 // ReplicaStateMachine抽象类定义
     2 abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
     3   ......
     4 }
     5 
     6 // ZkReplicaStateMachine具体实现类定义
     7 class ZkReplicaStateMachine(config: KafkaConfig, 
     8   stateChangeLogger: StateChangeLogger,
     9   controllerContext: ControllerContext,
    10   zkClient: KafkaZkClient,
    11   controllerBrokerRequestBatch: ControllerBrokerRequestBatch) 
    12   extends ReplicaStateMachine(controllerContext) with Logging {
    13   ......
    14 }

    KafkaController 对象在构建的时候,就会初始化一个 ZkReplicaStateMachine 实例,如下列代码所示:

    1 val replicaStateMachine: ReplicaStateMachine = new   
    2   ZkReplicaStateMachine(config, stateChangeLogger, 
    3     controllerContext, zkClient,
    4     new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))

    你可能会问:“如果一个 Broker 没有被选举为 Controller,它也会构建 KafkaController 对象实例吗?”没错!所有 Broker 在启动时,都会创建 KafkaController 实例,因而也会创建 ZKReplicaStateMachine 实例。每个 Broker 都会创建这些实例,并不代表每个 Broker 都会启动副本状态机。事实上,只有在 Controller 所在的 Broker 上,副本状态机才会被启动。具体的启动代码位于 KafkaController 的 onControllerFailover 方法。

    副本状态及状态管理流程

    副本状态机一旦被启动,就意味着它要行使它最重要的职责了:管理副本状态的转换。不过,在学习如何管理状态之前,我们必须要弄明白,当前都有哪些状态,以及它们的含义分别是什么。源码中的 ReplicaState 定义了 7 种副本状态。NewReplica:副本被创建之后所处的状态。OnlineReplica:副本正常提供服务时所处的状态。OfflineReplica:副本服务下线时所处的状态。ReplicaDeletionStarted:副本被删除时所处的状态。ReplicaDeletionSuccessful:副本被成功删除后所处的状态。ReplicaDeletionIneligible:开启副本删除,但副本暂时无法被删除时所处的状态。NonExistentReplica:副本从副本状态机被移除前所处的状态。具体到代码而言,ReplicaState 接口及其实现对象定义了每种状态的序号,以及合法的前置状态。我以 OnlineReplica 代码为例进行说明:

     1 // ReplicaState接口
     2 sealed trait ReplicaState {
     3   def state: Byte
     4   def validPreviousStates: Set[ReplicaState] // 定义合法的前置状态
     5 }
     6 
     7 // OnlineReplica状态
     8 case object OnlineReplica extends ReplicaState {
     9   val state: Byte = 2
    10   val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible)
    11 }

    状态转换方法定义

    在详细介绍 handleStateChanges 方法前,我稍微花点时间,给你简单介绍下其他 7 个方法都是做什么用的。就像前面说过的,这些方法主要是起辅助的作用。只有清楚了这些方法的用途,你才能更好地理解 handleStateChanges 的实现逻辑。logFailedStateChange:仅仅是记录一条错误日志,表明执行了一次无效的状态变更。

    logInvalidTransition:同样也是记录错误之用,记录一次非法的状态转换。

    logSuccessfulTransition:记录一次成功的状态转换操作。

    getTopicPartitionStatesFromZk:从 ZooKeeper 中获取指定分区的状态信息,包括每个分区的 Leader 副本、ISR 集合等数据。

    doRemoveReplicasFromIsr:把给定的副本对象从给定分区 ISR 中移除。

    removeReplicasFromIsr:调用 doRemoveReplicasFromIsr 方法,实现将给定的副本对象从给定分区 ISR 中移除的功能。

    doHandleStateChanges:执行状态变更和转换操作的主力方法。接下来,我们会详细学习它的源码部分。

    handleStateChanges 方法

    handleStateChange 方法的作用是处理状态的变更,是对外提供状态转换操作的入口方法。其方法如下:

     1 override def handleStateChanges(
     2   replicas: Seq[PartitionAndReplica], 
     3   targetState: ReplicaState): Unit = {
     4   if (replicas.nonEmpty) {
     5     try {
     6       // 清空Controller待发送请求集合
     7       controllerBrokerRequestBatch.newBatch()
     8       // 将所有副本对象按照Broker进行分组,依次执行状态转换操作
     9       replicas.groupBy(_.replica).foreach {
    10         case (replicaId, replicas) =>
    11           doHandleStateChanges(replicaId, replicas, targetState)
    12       }
    13       // 发送对应的Controller请求给Broker
    14       controllerBrokerRequestBatch.sendRequestsToBrokers(
    15         controllerContext.epoch)
    16     } catch {
    17       // 如果Controller易主,则记录错误日志然后抛出异常
    18       case e: ControllerMovedException =>
    19         error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
    20         throw e
    21       case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
    22     }
    23   }
    24 }

    代码逻辑总体上分为两步:第 1 步是调用 doHandleStateChanges 方法执行真正的副本状态转换;第 2 步是给集群中的相应 Broker 批量发送请求。在执行第 1 步的时候,它会将 replicas 按照 Broker ID 进行分组。举个例子,如果我们使用 < 主题名,分区号,副本 Broker ID> 表示副本对象,假设 replicas 为集合(<test, 0,="" 0="">, <test, 0,="" 1="">, <test, 1,="" 0="">, <test, 1,="" 1="">),那么,在调用 doHandleStateChanges 方法前,代码会将 replicas 按照 Broker ID 进行分组,即变成:Map(0 -> Set(<test, 0,="" 0="">, <test, 1,="" 0="">),1 -> Set(<test, 0,="" 1="">, <test, 1,="" 1="">))。待这些都做完之后,代码开始调用 doHandleStateChanges 方法,执行状态转换操作。这个方法看着很长,其实都是不同的代码分支。

    我们可以发现,代码的第 1 步,会尝试获取给定副本对象在 Controller 端元数据缓存中的当前状态,如果没有保存某个副本对象的状态,代码会将其初始化为 NonExistentReplica 状态。第 2 步,代码根据不同 ReplicaState 中定义的合法前置状态集合以及传入的目标状态(targetState),将给定的副本对象集合划分成两部分:能够合法转换的副本对象集合,以及执行非法状态转换的副本对象集合。doHandleStateChanges 方法会为后者中的每个副本对象记录一条错误日志。第 3 步,代码携带能够执行合法转换的副本对象集合,进入到不同的代码分支。

    由于当前 Kafka 为副本定义了 7 类状态,因此,这里的代码分支总共有 7 路。我挑选几路最常见的状态转换路径详细说明下,包括副本被创建时被转换到 NewReplica 状态,副本正常工作时被转换到 OnlineReplica 状态,副本停止服务后被转换到 OfflineReplica 状态。至于剩下的记录代码,你可以在课后自行学习下,它们的转换操作原理大致是相同的。

    第 1 路:转换到 NewReplica 状态

    首先,我们先来看第 1 路,即目标状态是 NewReplica 的代码。代码如下:

     1 case NewReplica =>
     2   // 遍历所有能够执行转换的副本对象
     3   validReplicas.foreach { replica =>
     4     // 获取该副本对象的分区对象,即<主题名,分区号>数据
     5     val partition = replica.topicPartition
     6     // 获取副本对象的当前状态
     7     val currentState = controllerContext.replicaState(replica)
     8     // 尝试从元数据缓存中获取该分区当前信息
     9     // 包括Leader是谁、ISR都有哪些副本等数据
    10     controllerContext.partitionLeadershipInfo.get(partition) match {
    11       // 如果成功拿到分区数据信息
    12       case Some(leaderIsrAndControllerEpoch) =>
    13         // 如果该副本是Leader副本
    14         if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
    15           val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
    16           // 记录错误日志。Leader副本不能被设置成NewReplica状态
    17           logFailedStateChange(replica, currentState, OfflineReplica, exception)
    18         // 否则,给该副本所在的Broker发送LeaderAndIsrRequest
    19         // 向它同步该分区的数据, 之后给集群当前所有Broker发送
    20         // UpdateMetadataRequest通知它们该分区数据发生变更
    21         } else {
    22           controllerBrokerRequestBatch
    23             .addLeaderAndIsrRequestForBrokers(
    24               Seq(replicaId),
    25               replica.topicPartition,
    26               leaderIsrAndControllerEpoch,
    27               controllerContext.partitionFullReplicaAssignment(
    28                 replica.topicPartition),
    29               isNew = true)
    30           if (traceEnabled)
    31             logSuccessfulTransition(
    32               stateLogger, replicaId, 
    33               partition, currentState, NewReplica)
    34           // 更新元数据缓存中该副本对象的当前状态为NewReplica
    35           controllerContext.putReplicaState(replica, NewReplica)
    36         }
    37       // 如果没有相应数据
    38       case None =>
    39         if (traceEnabled)
    40           logSuccessfulTransition(
    41             stateLogger, replicaId, 
    42             partition, currentState, NewReplica)
    43         // 仅仅更新元数据缓存中该副本对象的当前状态为NewReplica即可
    44         controllerContext.putReplicaState(replica, NewReplica)
    45     }
    46   }

    这一路主要做的事情是,尝试从元数据缓存中,获取这些副本对象的分区信息数据,包括分区的 Leader 副本在哪个 Broker 上、ISR 中都有哪些副本,等等。如果找不到对应的分区数据,就直接把副本状态更新为 NewReplica。否则,代码就需要给该副本所在的 Broker 发送请求,让它知道该分区的信息。同时,代码还要给集群所有运行中的 Broker 发送请求,让它们感知到新副本的加入。

    第 2 路:转换到 OnlineReplica 状态

    下面我们来看第 2 路,即转换副本对象到 OnlineReplica。刚刚我说过,这是副本对象正常工作时所处的状态。我们来看下要转换到这个状态,源码都做了哪些事情:

     1 case OnlineReplica =>
     2   validReplicas.foreach { replica =>
     3     // 获取副本所在分区
     4     val partition = replica.topicPartition
     5     // 获取副本当前状态
     6     val currentState = controllerContext.replicaState(replica)
     7     currentState match {
     8       // 如果当前状态是NewReplica
     9       case NewReplica =>
    10         // 从元数据缓存中拿到分区副本列表
    11         val assignment = controllerContext
    12           .partitionFullReplicaAssignment(partition)
    13         // 如果副本列表不包含当前副本,视为异常情况
    14         if (!assignment.replicas.contains(replicaId)) {
    15           error(s"Adding replica ($replicaId) that is not part of the assignment $assignment")
    16           // 将该副本加入到副本列表中,并更新元数据缓存中该分区的副本列表
    17           val newAssignment = assignment.copy(
    18             replicas = assignment.replicas :+ replicaId)
    19           controllerContext.updatePartitionFullReplicaAssignment(
    20             partition, newAssignment)
    21         }
    22       // 如果当前状态是其他状态
    23       case _ =>
    24         // 尝试获取该分区当前信息数据
    25         controllerContext.partitionLeadershipInfo
    26           .get(partition) match {
    27           // 如果存在分区信息
    28           // 向该副本对象所在Broker发送请求,令其同步该分区数据
    29           case Some(leaderIsrAndControllerEpoch) =>
    30             controllerBrokerRequestBatch
    31               .addLeaderAndIsrRequestForBrokers(Seq(replicaId),
    32                 replica.topicPartition,
    33                 leaderIsrAndControllerEpoch,
    34                 controllerContext
    35                   .partitionFullReplicaAssignment(partition), 
    36                 isNew = false)
    37           case None =>
    38         }
    39     }
    40     if (traceEnabled)
    41       logSuccessfulTransition(
    42         stateLogger, replicaId, 
    43         partition, currentState, OnlineReplica)
    44     // 将该副本对象设置成OnlineReplica状态
    45     controllerContext.putReplicaState(replica, OnlineReplica)
    46   }

    代码依然会对副本对象进行遍历,并依次执行下面的几个步骤。

    第 1 步,获取元数据中该副本所属的分区对象,以及该副本的当前状态。

    第 2 步,查看当前状态是否是 NewReplica。如果是,则获取分区的副本列表,并判断该副本是否在当前的副本列表中,假如不在,就记录错误日志,并更新元数据中的副本列表;如果状态不是 NewReplica,就说明,这是一个已存在的副本对象,那么,源码会获取对应分区的详细数据,然后向该副本对象所在的 Broker 发送 LeaderAndIsrRequest 请求,令其同步获知,并保存该分区数据。

    第 3 步,将该副本对象状态变更为 OnlineReplica。至此,该副本处于正常工作状态。

    第 3 路:转换到 OfflineReplica 状态

    最后,再来看下第 3 路分支。这路分支要将副本对象的状态转换成 OfflineReplica。我依然以代码注释的方式给出主要的代码逻辑:

     1 case OfflineReplica =>
     2   validReplicas.foreach { replica =>
     3     // 向副本所在Broker发送StopReplicaRequest请求,停止对应副本
     4     controllerBrokerRequestBatch
     5       .addStopReplicaRequestForBrokers(Seq(replicaId), 
     6         replica.topicPartition, deletePartition = false)
     7   }
     8   // 将副本对象集合划分成有Leader信息的副本集合和无Leader信息的副本集合
     9   val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = 
    10     validReplicas.partition { replica =>
    11       controllerContext.partitionLeadershipInfo
    12         .contains(replica.topicPartition)
    13     }
    14   // 对于有Leader信息的副本集合而言从,
    15   // 它们对应的所有分区中移除该副本对象并更新ZooKeeper节点
    16   val updatedLeaderIsrAndControllerEpochs = 
    17     removeReplicasFromIsr(replicaId,  
    18       replicasWithLeadershipInfo.map(_.topicPartition))
    19   // 遍历每个更新过的分区信息
    20   updatedLeaderIsrAndControllerEpochs.foreach {
    21     case (partition, leaderIsrAndControllerEpoch) =>
    22       stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
    23       // 如果分区对应主题并未被删除
    24       if (!controllerContext.isTopicQueuedUpForDeletion(
    25         partition.topic)) {
    26         // 获取该分区除给定副本以外的其他副本所在的Broker  
    27         val recipients = controllerContext
    28           .partitionReplicaAssignment(partition)
    29           .filterNot(_ == replicaId)
    30         // 向这些Broker发送请求更新该分区更新过的分区LeaderAndIsr数据
    31         controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
    32           recipients,
    33           partition,
    34           leaderIsrAndControllerEpoch,
    35           controllerContext.partitionFullReplicaAssignment(partition), 
    36           isNew = false)
    37       }
    38       val replica = PartitionAndReplica(partition, replicaId)
    39       val currentState = controllerContext.replicaState(replica)
    40       if (traceEnabled)
    41         logSuccessfulTransition(stateLogger, replicaId, 
    42           partition, currentState, OfflineReplica)
    43       // 设置该分区给定副本的状态为OfflineReplica
    44       controllerContext.putReplicaState(replica, OfflineReplica)
    45   }
    46   // 遍历无Leader信息的所有副本对象
    47   replicasWithoutLeadershipInfo.foreach { replica =>
    48     val currentState = controllerContext.replicaState(replica)
    49     if (traceEnabled)
    50       logSuccessfulTransition(stateLogger, replicaId, 
    51         replica.topicPartition, currentState, OfflineReplica)
    52      // 向集群所有Broker发送请求,更新对应分区的元数据
    53     controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(
    54       controllerContext.liveOrShuttingDownBrokerIds.toSeq,
    55       Set(replica.topicPartition))
    56     // 设置该分区给定副本的状态为OfflineReplica
    57     controllerContext.putReplicaState(replica, OfflineReplica)
    58   }

    首先,代码会给所有符合状态转换的副本所在的 Broker,发送 StopReplicaRequest 请求,显式地告诉这些 Broker 停掉其上的对应副本。Kafka 的副本管理器组件(ReplicaManager)负责处理这个逻辑。后面我们会用两节课的时间专门讨论 ReplicaManager 的实现,这里你只需要了解,StopReplica 请求被发送出去之后,这些 Broker 上对应的副本就停止工作了。其次,代码根据分区是否保存了 Leader 信息,将副本集合划分成两个子集:有 Leader 副本集合和无 Leader 副本集合。有无 Leader 信息并不仅仅包含 Leader,还有 ISR 和 controllerEpoch 等数据。不过,你大致可以认为,副本集合是根据有无 Leader 进行划分的。接下来,源码会遍历有 Leader 的子集合,向这些副本所在的 Broker 发送 LeaderAndIsrRequest 请求,去更新停止副本操作之后的分区信息,再把这些分区状态设置为 OfflineReplica。最后,源码遍历无 Leader 的子集合,执行与上一步非常类似的操作。只不过,对于无 Leader 而言,因为我们没有执行任何 Leader 选举操作,所以给这些副本所在的 Broker 发送的就不是 LeaderAndIsrRequest 请求了,而是 UpdateMetadataRequest 请求,显式去告知它们更新对应分区的元数据即可,然后再把副本状态设置为 OfflineReplica。从这段描述中,我们可以知道,把副本状态变更为 OfflineReplica 的主要逻辑,其实就是停止对应副本 + 更新远端 Broker 元数据的操作。

    PartitionStateMachine

    代码总共有 5 大部分。PartitionStateMachine:分区状态机抽象类。它定义了诸如 startup、shutdown 这样的公共方法,同时也给出了处理分区状态转换入口方法 handleStateChanges 的签名。ZkPartitionStateMachine:PartitionStateMachine 唯一的继承子类。它实现了分区状态机的主体逻辑功能。和 ZkReplicaStateMachine 类似,ZkPartitionStateMachine 重写了父类的 handleStateChanges 方法,并配以私有的 doHandleStateChanges 方法,共同实现分区状态转换的操作。PartitionState 接口及其实现对象:定义 4 类分区状态,分别是 NewPartition、OnlinePartition、OfflinePartition 和 NonExistentPartition。除此之外,还定义了它们之间的流转关系。PartitionLeaderElectionStrategy 接口及其实现对象:定义 4 类分区 Leader 选举策略。你可以认为它们是发生 Leader 选举的 4 种场景。PartitionLeaderElectionAlgorithms:分区 Leader 选举的算法实现。既然定义了 4 类选举策略,就一定有相应的实现代码,PartitionLeaderElectionAlgorithms 就提供了这 4 类选举策略的实现代码。

    每个 Broker 启动时,都会创建对应的分区状态机和副本状态机实例,但只有 Controller 所在的 Broker 才会启动它们。如果 Controller 变更到其他 Broker,老 Controller 所在的 Broker 要调用这些状态机的 shutdown 方法关闭它们,新 Controller 所在的 Broker 调用状态机的 startup 方法启动它们。

    分区状态

    既然 ZkPartitionStateMachine 是管理分区状态转换的,那么,我们至少要知道分区都有哪些状态,以及 Kafka 规定的转换规则是什么。这就是 PartitionState 接口及其实现对象做的事情。和 ReplicaState 类似,PartitionState 定义了分区的状态空间以及流转规则。我以 OnlinePartition 状态为例,说明下代码是如何实现流转的:

    1 sealed trait PartitionState {
    2   def state: Byte // 状态序号,无实际用途
    3   def validPreviousStates: Set[PartitionState] // 合法前置状态集合
    4 }
    5 
    6 case object OnlinePartition extends PartitionState {
    7   val state: Byte = 1
    8   val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
    9 }

    如代码所示,每个 PartitionState 都定义了名为 validPreviousStates 的集合,也就是每个状态对应的合法前置状态集。对于 OnlinePartition 而言,它的合法前置状态集包括 NewPartition、OnlinePartition 和 OfflinePartition。在 Kafka 中,从合法状态集以外的状态向目标状态进行转换,将被视为非法操作。目前,Kafka 为分区定义了 4 类状态。

    NewPartition:分区被创建后被设置成这个状态,表明它是一个全新的分区对象。处于这个状态的分区,被 Kafka 认为是“未初始化”,因此,不能选举 Leader。

    OnlinePartition:分区正式提供服务时所处的状态。

    OfflinePartition:分区下线后所处的状态。

    NonExistentPartition:分区被删除,并且从分区状态机移除后所处的状态。

    分区 Leader 选举的场景及方法

    刚刚我们说了两个状态机的相同点,接下来,我们要学习的分区 Leader 选举,可以说是 PartitionStateMachine 特有的功能了。每个分区都必须选举出 Leader 才能正常提供服务,因此,对于分区而言,Leader 副本是非常重要的角色。既然这样,我们就必须要了解 Leader 选举什么流程,以及在代码中是如何实现的。我们重点学习下选举策略以及具体的实现方法代码。

    <