• 售前

  • 售后

热门帖子
入门百科

十亿条数据需要每天计算怎么办?Spark快速入门

[复制链接]
万象争辉1 来自手机 显示全部楼层 发表于 2022-1-14 06:56:25 |阅读模式 打印 上一主题 下一主题
(一)概述

前段时间公司规划了一个新的项目,我成了这个项目的负责人。在做技术选型时,有一个需求阻碍了前进的步伐。大概有十亿条数据,数据总量在六百G左右,这些海量的数据需要每天根据一定的逻辑计算得到几千万的值。当数据量达到这种程度时,Java应用已经无法支撑了,于是在技术选型时选中了大数据计算框架–Spark。
(二)什么是Spark

Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。主要用来做大数据的分析计算。Spark是一个分布式数据快速分析框架,提供了比MapReduce更丰富的模型,可以在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。针对数亿级别的计算需求,Spark可以将所有数据读入到内存中,按配置的不同在内部生成几十或者几百个算子同时计算,速度十分快。
Spark的主要模块分为以下几个:

Spark Core: 提供了Spark最基础与最核心的功能,Spark的其他模块都是在Spark Core上进行扩展。
Spark SQL:用来操作结构化数据的组件,通过SparkCore,用户可以使用SQL来查询数据。
Spark Streaming:Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流API。
Spark MLlib:一个机器学习算法库。
Spark GraphX:Spark面向图计算提供的框架和算法库。
(三)Spark 应用构建

Spark的源码是用scala语言写的,同时也支持Java版本。更推荐使用scala语言去写spark代码,但是对程序员而言有一定的成本,因此在项目比较急的情况下使用Java写也是没问题的。
Spark的生产环境中使用需要搭建一套Spark运行环境,目前我所在公司搭建的Spark集群内存达到了1T,完全可以把所有的数据放进内存中计算。同时Spark也支持本地直接调用,通过引入maven依赖即可。
首先介绍在Idea中如何搭建Spark环境,本文所使用的scala版本是2.12,运行项目前首先确保安装了scala环境。
首先创建一个Maven项目,项目结构如下:

安装Idea中的scala插件:

在Project Structure中将scala引入

选择Add Framework Suppor,将里面的scala勾选


创建一个Object类型的Scala文件

编写测试代码:
  1. object Test {
  2.   def main(args: Array[String]): Unit = {
  3.     println("hello world")
  4.   }
  5. }
复制代码
如果成功输出,说明环境一切正常。
(四) wordCount案例

WordCount是大数据界的HelloWorld,一个最经典的MapReduce案例,这个案例是用来统计每个单词出现的次数,下面进入正题。
首先在Idea中引入Spark相关的依赖,我用的Scala是2.12版本,需要和依赖对齐:
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.apache.spark</groupId>
  4.         <artifactId>spark-core_2.12</artifactId>
  5.         <version>3.0.0</version>
  6.     </dependency>
  7. </dependencies>
复制代码
在sparkdemo根目录下创建一个文件夹data,在里面放两个文件分别是1.txt和2.txt,分别写上
  1. Hello world
  2. Hello scala
复制代码
编写WordCount程序,先介绍Java的使用,Spark中具体的代码含义会在后续博客中更新,整个程序做的事情就是统计两个文件中每个单词出现的次数,是最经典的MapReduce案例:
  1. public class JavaWordCount {
  2.     public static void main(String[] args) {
  3.         SparkConf conf = new SparkConf().setAppName("wordCount").setMaster("local");
  4.         JavaSparkContext sc = new JavaSparkContext(conf);
  5.         //读取文件转成RDD
  6.         JavaRDD<String> lines = sc.textFile("data/*");
  7.         //将每一行的单词根据空格拆分
  8.         JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());
  9.         //将Hello转化为(Hello,1)这种格式
  10.         JavaPairRDD<String, Integer> wordToOne = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<String,Integer>(s,1));
  11.         //根据key进行统计
  12.         JavaPairRDD<String, Integer> wordToCount = wordToOne.reduceByKey((x, y) -> x + y);
  13.         //输出结果
  14.         wordToCount.foreach((VoidFunction<Tuple2<String, Integer>>) stringIntegerTuple2 -> System.out.println(stringIntegerTuple2._1+stringIntegerTuple2._2));
  15.         sc.close();
  16.     }
  17. }
复制代码
使用scala实现的版本如下:
  1. object WordCount {
  2.   def main(args: Array[String]): Unit = {
  3.     val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount");
  4.     val sparkContext = new SparkContext(sparkConf);
  5.     val lines: RDD[String] = sparkContext.textFile(path = "data/*");
  6.     val words: RDD[String] = lines.flatMap(_.split(" "))
  7.     val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1))
  8.     val wordToCount = wordToOne.reduceByKey((x, y) => x + y).foreach(println)
  9.     sparkContext.stop();
  10.   }
  11. }
复制代码
运行结果如下:

(五)总结

本文只要结合具体的需求引出Spark,并快速介绍了Spark能做的一些事情,希望对你有所启发。我是鱼仔,我们下期再见。

来源:https://blog.caogenba.net/qq_41973594/article/details/122463933
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作