• 售前

  • 售后

热门帖子
入门百科

Flink Kafka 工具类(保证数据精准一致性)

[复制链接]
酱油的2017 显示全部楼层 发表于 2022-1-12 16:12:18 |阅读模式 打印 上一主题 下一主题
KafkaUtil 

  
  1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  2. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  4. import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import org.apache.kafka.clients.producer.ProducerRecord;
  8. import javax.annotation.Nullable;
  9. import java.util.Properties;
  10. /**
  11. * @author zyj
  12. * @Date 2022/1/6 15:29
  13. * 操作 kafka 的工具类
  14. */
  15. public class MyKafkaUtil {
  16.     private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
  17.     private static final String DEFAULT_TOPIC = "default_topic";
  18.     // 获取kafka的消费者
  19.     public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
  20.         Properties props = new Properties();
  21.         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
  22.         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  23.         return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
  24.     }
  25.     // 获取kafka的生产者(String数据类型)
  26.    /*
  27.    注意:下面这中实现只能保证数据不丢,不能保证精准一次性
  28.    public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
  29.         return new FlinkKafkaProducer<String>(
  30.                 KAFKA_SERVER,
  31.                 topic,
  32.                 new SimpleStringSchema()
  33.         );
  34.     }*/
  35.     public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
  36.         Properties props = new Properties();
  37.         // 指定kafka服务端ip地址
  38.         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
  39.         // 设置生产超时时间
  40.         props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");
  41.         // 创建kafka生产者,自定义序列化器 KafkaSerializationSchema<String> 这里是指定String类型
  42.         return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
  43.             @Override
  44.             public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
  45.                 return new ProducerRecord<byte[], byte[]>(topic, str.getBytes());
  46.             }
  47.         }, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  48.     }
  49.     // 获取kafka的生产者(通用数据类型)
  50.     public static <T> FlinkKafkaProducer<T> getKafkaSinkSchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
  51.         Properties props = new Properties();
  52.         // 指定kafka服务端ip地址
  53.         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
  54.         // 设置生产超时时间
  55.         props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");
  56.         return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  57.     }
  58. }
复制代码
来源:https://blog.caogenba.net/m0_48379126/article/details/122427216
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作