• 售前

  • 售后

热门帖子
入门百科

美团二面:具体说说Kafka拉消息的过程?

[复制链接]
樱花283 显示全部楼层 发表于 2022-1-8 17:33:54 |阅读模式 打印 上一主题 下一主题
AbstractFetcherThread:拉取消息的步调

副本机制是Kafka实现数据高可靠性的底子:同一个分区下的多个副本分散在不同的Broker机器上,它们保存雷同的消息数据以实现高可靠性。那怎样确保所有副本上的数据同等性呢?最常见方案当属Leader/Follower备份机制(Leader/Follower Replication)。Kafka分区的:


  • 某个副本会被指定为Leader,负责相应客户端的读、写哀求
  • 其他副本主动成为Follower,被动同步Leader副本中的数据
    被动同步:Follower副本不断向Leader副本发送读取哀求,以获取Leader处写入的最新消息数据
本文就研究Follower副本怎样通过拉取线程实现这一目标。Follower副本在副本同步过程中,还可能发生截断(Truncation),其原理又是为何?
案例

这部分源码贴近底层设计架构原理。阅读它对我实际有啥用?
生产情况曾发现,一旦Broker上副本数过多,Broker内存占用就会很高。HeapDump后,发现在于ReplicaFetcherThread#buildFetch有这么一行代码:
  1. val builder = fetchSessionHandler.newBuilder()
复制代码
内部会实例化一个LinkedHashMap。若分区数许多,该Map会被扩容数次,带来大量不须要的数据拷贝,既增长内存Footprint,又浪费CPU。后续通过将负载转移到其他Broker解决该题目。
Kafka社区也发现了这个Bug,以是现在酿成:
修改后语句直接传入FETCH哀求中总的分区数,并直接将其传给LinkedHashMap,制止再实行扩容。
说回Follower副本从Leader副本拉取数据。Kafka就是通过ReplicaFetcherThread,副本获取线程实现的消息拉取及处理。
本文先从抽象基类AbstractFetcherThread研究,终极彻底搞明白Follower端同步Leader端消息的原理。
AbstractFetcherThread

抽象类,从Broker获取多个分区的消息数据,至于获取之后怎样对这些数据举行处理,则交由子类来实现。
类界说及字段


除了构造器的这几个字段,AbstractFetcherThread还界说了两个type类型。关键字type界说一个类型,可当做一个快捷方式,如FetchData:
  1. type FetchData = FetchResponse.PartitionData[Records]
复制代码
雷同快捷方式:凡源码用到FetchResponse.PartitionData[Records],都可使用FetchData更换,EpochData同理。
FetchData界说里的PartitionData类型,是客户端clients工程中FetchResponse类的嵌套类。FetchResponse类封装的是FETCH哀求的Response对象,其内PartitionData是个POJO,保存Response中单个分区数据拉取的各项数据:


  • 从该分区的Leader副本拉取回来的消息
  • 该分区的高水位值
  • 日志起始位移值



在PartitionData中,最需关注的是recordSet,保存了实际的消息聚集。


  • 注意到EpochData界说位置,它也是PartitionData类型,但EpochData的PartitionData是OffsetsForLeaderEpochRequest的PartitionData类型
    Kafka源码有许多名为PartitionData的嵌套类。许多哀求类型中的数据都是按分区层级分组,因此源码很自然地在这些哀求类中创建同名嵌套类。以是,注意区分PartitionData嵌套类是界说在哪类哀求中的!
分区读取状态类

AbstractFetcherThread构造器中,还有个**PartitionStates[PartitionFetchState]**类型的字段:


  • 泛型参数类型PartitionFetchState类,表征分区读取状态,保存分区的已读取位移值和对应副本状态。
这里的状态有二:
副本读取状态

副本读取状态由ReplicaState接口表现:
分区读取状态:



  • 可获取,表明副本获取线程当前能够读取数据。
  • 截断中,表明分区副本正在实行截断操纵(比如该副本刚刚成为Follower副本)。
  • 被推迟,表明副本获取线程获取数据时出现错误,须要等待一段时间后重试。
分区读取状态中的【可获取、截断中】与副本读取状态的【获取中、截断中】并非严酷对应。副本读取状态处获取中,并不肯定表现分区读取状态就是可获取状态。对于分区,它是否能被获取的条件要比副本严酷。

副本获取线程做的变乱,日志截断和消息获取:


  • isReplicaInSync,副本限流,出镜率不高
  • isDelayed,判定是否须要推迟获取对应分区的消息
    源码会不断调解那些不须要推迟的分区的读取顺序,以包管读取公平性。公平性实现在partitionStates字段的PartitionStates类,界说在clients工程。会吸收一组要读取的主题分区,然后轮询读取这些分区以确保公平性。
clients端源码自行查阅。
  1. public class PartitionStates<S> {
  2.     private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
  3.     ......
  4.     public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {
  5.       map.remove(topicPartition);
  6.       map.put(topicPartition, state);
  7.       updateSize();
  8.     }
  9.     ......
  10. }
复制代码
PartitionStates轮询处理要读取的多个分区,依靠LinkedHashMap保存所有主题分区,其元素有明白迭代顺序,默以为元素插入的顺序。
假设Kafka要读5个分区的消息:A、B、C、D和E。若插入顺序:ABCDE,则起首读分区A。一旦A被读取后,为确保各分区都有同等时机被读取,代码需将A插入到分区列表的末了一位,这就是updateAndMoveToEnd:把A从map中移除,再插归去,如许A自然就处于列表的末了一位了。这便是PartitionStates的作用。
core API

processPartitionData、truncate、buildFetch和doWork,涵盖拉取线程所做的最紧张的3件事:


  • 构建FETCH哀求
  • 实行截断操纵
  • 处理拉取后的结果
doWork串联起前面的这3方法。
最紧张的processPartitionData,用于处理读取回来的消息聚集。它是个抽象方法,因此需子类实现它的逻辑。具体到Follower副本而言, 由ReplicaFetcherThread类实现:
  1. protected def processPartitionData(
  2.   topicPartition: TopicPartition,  // 读取哪个分区的数据
  3.   fetchOffset: Long,               // 读取到的最新位移值
  4.   partitionData: FetchData         // 读取到的分区消息数据
  5. ): Option[LogAppendInfo]           // 写入已读取消息数据前的元数据
复制代码
返回值Option[LogAppendInfo]:


  • 对Follower副本读消息写入日志,可忽略Option,由于肯定会返回具体LogAppendInfo实例,而不是None
  • LogAppendInfo类封装了许多消息数据被写入到日志前的紧张元数据信息,如首条消息的位移值、末了一条消息位移值、最大时间戳等
truncate

  1. protected def truncate(
  2.   topicPartition: TopicPartition, // 要对哪个分区下副本执行截断操作
  3.   truncationState: OffsetTruncationState  // Offset + 截断状态
  4. ): Unit
复制代码
OffsetTruncationState类告诉Kafka要把指定分区下副本截断到哪个位移值,封装了:


  • 一个位移值
  • 一个截断完成与否的布尔值状态
buildFetch
  1. protected def buildFetch(
  2.   // 一组要读取的分区列表
  3.   // 分区是否可读取取决于PartitionFetchState中的状态
  4.   partitionMap: Map[TopicPartition, PartitionFetchState]):
  5. // 封装FetchRequest.Builder对象
  6. ResultWithPartitions[Option[ReplicaFetch]]
复制代码
本质为指定分区构建对应FetchRequest.Builder对象,而该对象是构建FetchRequest的核心组件。Kafka中任何类型的消息读取,都是通过给指定Broker发送FetchRequest哀求来完成的。
doWork

串联前面3个方法的主要入口方法。
总结

本文研究Kafka的副本同步机制和副本管理器组件。Kafka副本间的消息同步依赖ReplicaFetcherThread线程。AbstractFetcherThread作为拉取线程的公共基类,AbstractFetcherThread类界说了许多紧张方法。


  • AbstractFetcherThread类:拉取线程的抽象基类。它界说了公共方法处理所有拉取线程的共同逻辑,如实行截断操纵,获取消息。
  • 拉取线程逻辑:循环实行截断操纵和获取数据操纵。
  • 分区读取状态:当前,源码界说了3类分区读取状态。拉取线程只能拉取处于可读取状态的分区的数据


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作