• 售前

  • 售后

热门帖子
入门百科

Kafka【付诸实践 04】Java实现筛选查询Kafka符合条件的最新数据(保证数据

[复制链接]
成哥337 显示全部楼层 发表于 2022-1-13 09:44:47 |阅读模式 打印 上一主题 下一主题
1.需求说明

业务上有大量从硬件采集到的数据通过Kafka入库GreenPlum数据库,虽然数据表已进行分区,每个分区少的有100+万条多的时候有1000+万条记录,现在有一个接口要获取最新的20条数据用来展示,即便是从单个分区上查询由于需要全量数据排序,时间长的时候需要7~8秒,这个时候就考虑直接从Kafka获取最新数据。
2.代码实现

2.1 配置信息

这里只贴出使用到的配置信息。
  1. # kafka的服务地址
  2. spring:
  3.   kafka:
  4.     bootstrap-servers: 127.0.0.1:xxxx
  5. # tableName与topic的映射
  6. tableNameKafkaTopic:
  7.   mapping: "{"table_name":"topic_name"}"
复制代码
  1. <!-- 用到了这个依赖里的时间工具 将字符串时间转换成 Date 这个方法也可以自己写 -->
  2. <dependency>
  3.         <groupId>cn.hutool</groupId>
  4.         <artifactId>hutool-all</artifactId>
  5.         <version>5.6.6</version>
  6. </dependency>
复制代码
2.2 映射对象

由于Kafka内的字段跟数据库的字段名称不同,这里要创建映射关系(仅保留几个字段用来说明问题)。
  1. @Data
  2. @ApiModel(value = "数据封装对象", description = "用于对Kafka内的数据进行封装")
  3. public class DataRes implements Serializable {
  4.    
  5.     @ApiModelProperty(name = "LOCATION", value = "设备位置")
  6.     @JsonProperty(value = "LOCATION")
  7.     private String location;
  8.    
  9.     @ApiModelProperty(name = "IP", value = "设备ID")
  10.     @JsonProperty(value = "IP")
  11.     private String equip;
  12.     @ApiModelProperty(name = "TME", value = "创建时间")
  13.     @JsonProperty(value = "TME")
  14.     private String tme;
  15.     @ConstructorProperties({"LOCATION", "IP", "TME"})
  16.     public DataGsmRes(String location, String ip, String tme) {
  17.         this.location = location;
  18.         this.equip = ip;
  19.         this.tme = tme;
  20.     }
  21. }
复制代码
Kafka的记录信息:
  1. {"LOCATION":"河南郑州","IP":"xxxx","TME":"2022-01-12 15:29:55"}
复制代码
接口返回的数据:
  1. {
  2.     "location": "河南郑州",
  3.     "equip": "xxxx",
  4.     "tme": "2022-01-12 15:29:55"
  5. }
复制代码
2.3 代码实现

为了简洁删掉了一些业务相关的代码。
  1.     @Value("${spring.kafka.bootstrap-servers}")
  2.     private String bootstrapServers;
  3.     @Value("${tableNameKafkaTopic.mapping}")
  4.     private String tableNameKafkaTopicMapping;
  5.     @Override
  6.     public BaseResult<PageEntity<Map>> queryNewest(Map mapParam) {
  7.         // 参数解析(根据tableName获取对应的Kafka主题)
  8.         String tableName = MapUtils.getString(mapParam, "table_name", "");
  9.         if (StringUtils.isBlank(tableName)) {
  10.             return BaseResult.getInstance(101, "数据源参数table_name不能为空!");
  11.         }
  12.         // 获取equip信息用来筛选数据
  13.         JSONObject jsonParam = new JSONObject();
  14.         try {
  15.             String paramStr = MapUtils.getString(mapParam, "param_json", "");
  16.             JSONObject json = JSONObject.parseObject(paramStr);
  17.             if (json != null) {
  18.                 for (String key : json.keySet()) {
  19.                     jsonParam.put(key.toLowerCase(), json.get(key));
  20.                 }
  21.             }
  22.         } catch (Exception e) {
  23.             return BaseResult.getInstance(102, "请求参数param_json非JSON格式");
  24.         }
  25.         Object equip = jsonParam.get("equip");
  26.         if (equip == null || StringUtils.isBlank(equip.toString())) {
  27.             return BaseResult.getInstance(101, "请求参数param_json内的equip不能为空!");
  28.         }
  29.         List<String> equipList = Arrays.asList(equip.toString().split(","));
  30.         // 从Kafka获取的符合条件的数据条数 equipKey用于筛选数据 timeKey用于排序
  31.         String equipKey = "IP";
  32.         String timeKey = "TME";
  33.         int pageSize = MapUtils.getInteger(mapParam, "pageSize");
  34.         int querySize = 1000;
  35.         int queryTime = 50;
  36.         int queryTotal = querySize * queryTime;
  37.         // 结果数据封装
  38.         List<Map> rows = new ArrayList<>();
  39.         List<Map> rowsSorted;
  40.         // 从Kafka获取最新数据
  41.         Properties props = new Properties();
  42.         props.put("bootstrap.servers", bootstrapServers);
  43.         props.put("group.id", "queryNewest");
  44.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  45.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  46.         props.put("max.poll.records", querySize);
  47.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  48.         // 查询主题数据
  49.         Object topicName = JSONObject.parseObject(tableNameKafkaTopicMapping).get(tableName);
  50.         if (topicName != null && StringUtils.isNotBlank(topicName.toString())) {
  51.             TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0);
  52.             List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
  53.             consumer.assign(topicPartitionList);
  54.             consumer.seekToEnd(topicPartitionList);
  55.             // 获取当前最大偏移量
  56.             long currentPosition = consumer.position(topicPartition);
  57.             int recordsCount;
  58.             try {
  59.                 for (int i = 1; i <= queryTime; i++) {
  60.                     long seekOffset = currentPosition - i * querySize;
  61.                     consumer.seek(topicPartition, seekOffset > 0 ? seekOffset : 0);
  62.                     ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
  63.                     recordsCount = records.count();
  64.                     for (ConsumerRecord<String, String> record : records) {
  65.                         queryTotal--;
  66.                         Map map = JSONObject.parseObject(record.value());
  67.                         String ip = MapUtils.getString(map, equipKey);
  68.                         if (equipList.size() == 0 || equipList.contains(ip)) {
  69.                             rows.add(map);
  70.                         }
  71.                     }
  72.                     // 获取数据(达到 pageSize 或 queryTotal 或消息队列无数据 即停止查询)
  73.                     if (rows.size() >= pageSize || queryTotal <= 0 || recordsCount <= 0) {
  74.                         break;
  75.                     }
  76.                 }
  77.             } finally {
  78.                 consumer.close();
  79.                 // 重新排序
  80.                 String finalTimeKey = timeKey;
  81.                 rowsSorted = rows.stream()
  82.                 .sorted(Comparator.comparingLong(row -> -DateUtil.parse(row.get(finalTimeKey).toString(), "yyyy-MM-dd HH:mm:ss").getTime()))
  83.                 .collect(Collectors.toList());
  84.             }
  85.         } else {
  86.             return BaseResult.getInstance(301, "不存在" + tableName + "对应的KafkaTopic!");
  87.         }
  88.         // 结果封装(截取pageSize个结果并映射key值)
  89.         List<Map> subList;
  90.         if (rowsSorted.size() > pageSize) {
  91.             subList = rowsSorted.subList(0, pageSize);
  92.         } else {
  93.             subList = rowsSorted;
  94.         }
  95.         List<Map> res = new ArrayList<>();
  96.         PageEntity<Map> pageEntity = new PageEntity<>();
  97.         // 重新封装Kafka数据(字段值映射)
  98.         res = subList.stream()
  99.                     .map(item -> JSONObject.parseObject(JSON.toJSONString(JSONObject.parseObject(item.toString(), DataRes.class)), Map.class))
  100.                     .collect(Collectors.toList());
  101.         }
  102.         pageEntity.setTotal(res.size());
  103.         pageEntity.setRows(res);
  104.         return BaseResult.getInstance(pageEntity);
  105.     }
复制代码
3.算法分析


  • 筛选的是查询时最大偏移量向前queryTotal条数据,这个可以根据业务进行调整。
  • 重新封装Kafka数据的算法实际上是修改map对象key的方法。
  • max.poll.records参数明确了每次poll的记录数便于统计。
  • 特别注意:当前代码适用的是Partition只有1️⃣个的情况,多个分区的情况需要先查询分区数,再轮询获取每个分区的最新数据,合并后重新排序。
  1. // 仅查询了1个分区
  2. TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0);
  3. // 获取主题的分区列表
  4. List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topicName.toString());
  5. //Partition(topic = gp_gsmdata, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
复制代码
来源:https://blog.caogenba.net/yuanzhengme/article/details/122454111
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作