• 售前

  • 售后

热门帖子
入门百科

Flink--Hybrid Source提出的动机及实现原理介绍

[复制链接]
123457287 显示全部楼层 发表于 2022-1-12 15:27:40 |阅读模式 打印 上一主题 下一主题
Hybrid Source的提出动机

在实践中,许多Flink作业需要按顺序从多个数据源读取数据,具体有如下2个场景:


  • Change Data Capture (CDC): 用户可以将数据的快照存储在HDFS/S3中,数据的更新日志存储在数据库binlog或Kafka中
  • 机器学习特征回填: 当向模型添加新特性时,需要从几个月前到现在的原始数据计算该特性。在大多数情况下,历史数据和实时数据分别存储在两种不同的存储系统中,例如HDFS和Kafka。
在过去,用户可能必须运行两个不同的Flink作业,或者在SourceFunction中进行一些修改来处理这种情况,针对大多数用户,这种实现过于复杂:


  • 基于当前不同连接器的代码实现,多个源之间的切换很复杂。如何在切换前控制上游源的具体状态,以及下游源如何将上游源的状态转换为初始状态具有重要意义。
  • 自动切换构成混合源的用户定义可切换源会导致复杂的实现。在大多数情况下,用户添加自定义源,Flink会按照指定的添加顺序自动切换这些源。
  • 目前还没有有效的机制来支持历史数据和实时数据之间平滑的源迁移,例如FileSystem和Kafka源之间的源迁移。平滑迁移需要定义源切换的规则和时间,以及使用什么凭证进行切换,以确保数据的完整性和一致性。
为了平滑地支持这种场景,Flink作业需要先从HDFS读取历史数据,然后切换到Kafka读取实时记录,所以需要引入了一个建立在新的Source API (FLIP-27)之上的“混合Source”API来帮助用户处理这种情况,
目标如下


  • 基于使用FLIP-27构建的现有的Source连接器,不需要做任何更改
  • 支持源的任意组合,形成混合源
实现原理

基本思路

混合源是包含具体源列表的源。混合源按定义的顺序读取每个源的数据。当A源读取数据结束时,它从A源切换到下一个B源。
在大多数情况下,混合源只包含两个源,但是如果需要,它可以包含更多的源。
切换方式如下:


  • 源的开始/结束位置预先配置,并包装到HybridSource中。不需要现有源提供特别的支持。
  • 通过切换时的位置转换,将当前源的结束位置转换为下一个源的开始位置,则需要现有的源提供如下支持:

    • 当前源的分割枚举器中提供结束位置,
    • 下一个源中支持设置开始位置(如KafkaSource支持从设置的开始时间戳读取数据),
    • 用户提供的函数:将当前源的结束位置转换为下一个源的开始位置。

HybridSource的功能如下:


  • HybridSource枚举器管理在两个源之间切换的过程。
  • 用户提供的SourceFactory实现用于在前一个Source完成读取时创建下一个Source。
  • SourceFactory预计会做以下工作:

    • 从前一个已完成的Source的SplitEnumerator中获取end_position。(这可能需要修改现有的源,如FileSource,暴露结束位置。)
    • 将end_position转换为下一个源的start_position。
    • 构建和设置下一个源。

  • 切换方式如下:

    • 源A以end_position结束
    • SourcePositionConverter接受end_position并使用初始化源B的start_position。

需要解决的问题



  • 获取源A的end_position。

    • 在FLIP-27中,有界性是Source实例的固有属性。然而,FLIP-27在源读取数据结束时,没有暴露end_position。

  • 根据源A的end_position初始化源B的start_position。

    • 源B的start_position类型通常与源a的end_position类型不同。
    • 位置的转换需要用户自定义的逻辑

  • Dynamic start position at switch time
    Example: File source reads a very large backlog, taking potentially longer than retention available for next source. Switch needs to occur at “current time - X”. This requires the start time for the next source to be set at switch time. Here we require transfer of end position from the previous file enumerator for deferred construction of KafkaSource by implementing SourceFactory.
    Note that enumerators need to support getting the end timestamp. This may currently require a source customization. Adding support for dynamic end position to FileSource is tracked in FLINK-23633.

源平滑切换示例

当使用FileSource作为要切换的源时,FileSplitEnumerator可以使用File记录的最大时间戳作为getEndState方法的返回值,该方法被视为文件系统源的END_POSITION。
当使用KafkaSource作为切换源时,KafkaSplitEnumerator可以使用上游源的最大时间戳作为setStartState方法的参数值,这被认为是Kafka消费者要寻找的START_POSITION。
示意图如下:

源切换代码示例

两种不同源切换的代码示例
  1. FileSource<String> fileSource =
  2.    FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
  3. KafkaSource<String> kafkaSource =
  4.            KafkaSource.<String>builder()
  5.                    .setBootstrapServers("localhost:9092")
  6.                    .setGroupId("MyGroup")
  7.                    .setTopics(Arrays.asList("quickstart-events"))
  8.                    .setDeserializer(
  9.                            KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
  10.                    .setStartingOffsets(OffsetsInitializer.earliest())
  11.                    .build();
  12. HybridSource<String> hybridSource =
  13.            HybridSource.builder(fileSource)
  14.                    .addSource(kafkaSource)
  15.                    .build();
  16. }
  17. HybridSource<String> hybridSource =
  18.     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
  19.         .addSource(
  20.             switchContext -> {
  21.               StaticFileSplitEnumerator previousEnumerator =
  22.                   switchContext.getPreviousEnumerator();
  23.               // how to get timestamp depends on specific enumerator
  24.               long timestamp = previousEnumerator.getEndTimestamp();
  25.               OffsetsInitializer offsets =
  26.                   OffsetsInitializer.timestamp(timestamp);
  27.               KafkaSource<String> kafkaSource =
  28.                   KafkaSource.<String>builder()
  29.                       .setBootstrapServers("localhost:9092")
  30.                       .setGroupId("MyGroup")
  31.                       .setTopics(Arrays.asList("quickstart-events"))
  32.                       .setDeserializer(
  33.                           KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
  34.                       .setStartingOffsets(offsets)
  35.                       .build();
  36.               return kafkaSource;
  37.             },
  38.             Boundedness.CONTINUOUS_UNBOUNDED)
  39.         .build();
复制代码
参考

FLIP-150: Introduce Hybrid Source
hybridsource

来源:https://blog.caogenba.net/penriver/article/details/122377396
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作