• 售前

  • 售后

热门帖子
入门百科

sparkSql数据离线处理--整理记录

[复制链接]
海沙心诖 显示全部楼层 发表于 2022-1-12 16:24:01 |阅读模式 打印 上一主题 下一主题
sparkSql数据离线处理

前言:本文作为本人学习sparkSql离线数据抽取,离线数据处理的学习整理记录,文中参考博客均附上原文链接。
一、Hive环境准备

1、配置文件准备:

/opt/hive/conf/hive-site.xml:(2021/12/31修改,添加了&useSSL=false&useUnicode=true&characterEncoding=utf8支持中文编码)
  1. <?xml version="1.0" encoding="UTF-8" standalone="no"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4.   <property>
  5.     <name>javax.jdo.option.ConnectionURL</name>
  6.     <value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8</value>
  7.     <description>hive的元数据库 </description>
  8.   </property>
  9.   <property>
  10.     <name>javax.jdo.option.ConnectionDriverName</name>
  11.     <value>com.mysql.jdbc.Driver</value>
  12.     <description>mysql的驱动jar包 </description>
  13.   </property>
  14.   <property>
  15.     <name>javax.jdo.option.ConnectionUserName</name>
  16.     <value>root</value>
  17.     <description>设定数据库的用户名 </description>
  18.   </property>
  19.   <property>
  20.     <name>javax.jdo.option.ConnectionPassword</name>
  21.     <value>xxx</value>
  22.     <description>设定数据库的密码</description>
  23.    </property>
  24. <!--zbt添加-->
  25.    <property>
  26.       <name>hive.exec.max.dynamic.partitions</name>
  27.       <value>100000</value>
  28.       <description>在所有执行MR的节点上,最大一共可以创建多少个动态分区</description>
  29.    </property>
  30.    <property>
  31.       <name>hive.exec.max.dynamic.partitions.pernode</name>
  32.       <value>100000</value>
  33.       <description>在所有执行MR的节点上,最大可以创建多少个动态分区</description>
  34.   </property>
  35. </configuration>
复制代码
若要在idea环境下运行要把
hdfs-site.xml
core-site.xml

hive-site.xml

放到resources文件夹中

否则hive.exec.max.dynamic.partitions.pernode,hive.exec.max.dynamic.partitions
配置不生效
2、hosts设置

若在不同网络环境下
需设置本地hosts

设置的内容为集群主机名
Ubuntu的hosts文件在 /etc 下
参考资料:(10条消息) java.lang.IllegalArgumentException: java.net.UnknownHostException: xxx_小健的博客-CSDN博客
3、远程连接服务开启

hive --service metastore
参考资料:(13条消息) hive的几种启动方式_lbl的博客-CSDN博客_hive启动
4、其他

mysql服务启动
service mysqld start
防火墙关闭
systemctl stop firewalld
二、IDEA环境准备

1、pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>sparkDome1</artifactId>
  7.         <groupId>org.example</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>HiveAndMysql</artifactId>
  12.     <properties>
  13.         <maven.compiler.source>8</maven.compiler.source>
  14.         <maven.compiler.target>8</maven.compiler.target>
  15.         <hadoop.version>2.7.7</hadoop.version>
  16.         <spark.version>2.1.1</spark.version>
  17.         <scala.version>2.11</scala.version>
  18.     </properties>
  19.     <dependencies>
  20.         <!--hadoop依赖-->
  21.         <dependency>
  22.             <groupId>org.apache.hadoop</groupId>
  23.             <artifactId>hadoop-client</artifactId>
  24.             <version>${hadoop.version}</version>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.apache.hadoop</groupId>
  28.             <artifactId>hadoop-common</artifactId>
  29.             <version>${hadoop.version}</version>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.apache.hadoop</groupId>
  33.             <artifactId>hadoop-hdfs</artifactId>
  34.             <version>${hadoop.version}</version>
  35.         </dependency>
  36.         <!--scala依赖-->
  37.         <dependency>
  38.             <groupId>org.scala-lang</groupId>
  39.             <artifactId>scala-library</artifactId>
  40.             <version>2.11.0</version>
  41.         </dependency>
  42.         <!--spark依赖-->
  43.         <dependency>
  44.             <groupId>org.apache.spark</groupId>
  45.             <artifactId>spark-core_${scala.version}</artifactId>
  46.             <version>${spark.version}</version>
  47.         </dependency>
  48.         <dependency>
  49.             <groupId>org.apache.spark</groupId>
  50.             <artifactId>spark-sql_${scala.version}</artifactId>
  51.             <version>${spark.version}</version>
  52.         </dependency>
  53.         <dependency>
  54.             <groupId>org.apache.spark</groupId>
  55.             <artifactId>spark-streaming_${scala.version}</artifactId>
  56.             <version>${spark.version}</version>
  57.         </dependency>
  58.         <dependency>
  59.             <groupId>org.apache.spark</groupId>
  60.             <artifactId>spark-mllib_${scala.version}</artifactId>
  61.             <version>${spark.version}</version>
  62.         </dependency>
  63.         <!--hive依赖-->
  64.         <dependency>
  65.             <groupId>org.apache.spark</groupId>
  66.             <artifactId>spark-hive_${scala.version}</artifactId>
  67.             <version>${spark.version}</version>
  68.         </dependency>
  69.         <dependency>
  70.             <groupId>mysql</groupId>
  71.             <artifactId>mysql-connector-java</artifactId>
  72.             <version>5.1.48</version>
  73.         </dependency>
  74.     </dependencies>
  75.     <build>
  76.         <plugins>
  77.             <plugin>
  78.                 <groupId>org.scala-tools</groupId>
  79.                 <artifactId>maven-scala-plugin</artifactId>
  80.                 <version>2.15.2</version>
  81.                 <executions>
  82.                     <execution>
  83.                         <id>scala-compile</id>
  84.                         <goals>
  85.                             <goal>compile</goal>
  86.                         </goals>
  87.                         <configuration>
  88.                             <!--includes是一个数组,包含要编译的code-->
  89.                             <includes>
  90.                                 <include>**/*.scala</include>
  91.                             </includes>
  92.                         </configuration>
  93.                     </execution>
  94.                     <execution>
  95.                         <id>scala-test-compile</id>
  96.                         <goals>
  97.                             <goal>testCompile</goal>
  98.                         </goals>
  99.                     </execution>
  100.                 </executions>
  101.             </plugin>
  102.         </plugins>
  103.     </build>
  104. </project>
复制代码
2、Hadoop环境

window下运行需要准备Hadoop环境
在代码编写中指定hadoop.home.dir
System.setProperty("hadoop.home.dir","........")

3、其他

Scala插件依赖需先下载好
注意环境与集群对应,本文档的环境为Scala-11
三、代码编写

1、全量抽取

  1. import org.apache.spark.sql.SparkSession
  2. /**
  3.    * mysql->hive 全量抽取
  4.    */
  5.    object ShopTest {
  6.   def main(args: Array[String]): Unit = {
  7.         //设置用户名,防止因为权限不足无法创建文件
  8.     System.setProperty("HADOOP_USER_NAME", "root")
  9.     //获取实例对象
  10.     val spark = SparkSession.builder()
  11.       .appName("ShopTest")
  12.       .master("local[*]")
  13.       .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
  14.       .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
  15.       .enableHiveSupport()
  16.       .getOrCreate()
  17.     //jdbc连接配置
  18.     val mysqlMap = Map(
  19.       "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false",
  20.       "user" -> "root",
  21.       "password" -> "xxx",
  22.       "driver" -> "com.mysql.jdbc.Driver"
  23.     )
  24.     //使用jdbc抽取mysql表数据
  25.     val inputTable = spark.read.format("jdbc")
  26.       .options(mysqlMap)
  27.       .option("dbtable", "EcData_tb_1")
  28.       .load()
  29.    
  30.     //    inputTable.show()
  31.       
  32.     //将mysql表数据创建为临时表
  33.     inputTable.createOrReplaceTempView("inputTable")
  34.     //hive动态分区开启
  35.     spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
  36.     //hive分区模式设置,默认为strict严格模式,若设置分区必须要有一个静态分区
  37.     //需要设置为nonstrict模式,可以都是动态分区
  38.     spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
  39.     //hive分区数设置,目前版本已无法在程序中设置,参考上文Hive环境准备-配置文件准备
  40.     spark.sqlContext.sql("set hive.exec.max.dynamic.partitions.pernode = 10000")
  41.     spark.sqlContext.sql("set hive.exec.max.dynamic.partitions = 10000")
  42.     // mysql表结构,通过desc table tb_name;命令可获取
  43.     /*    +-------------+---------+------+-----+---------+-------+
  44.         | Field       | Type    | Null | Key | Default | Extra |
  45.         +-------------+---------+------+-----+---------+-------+
  46.         | InvoiceNo   | text    | YES  |     | NULL    |       |
  47.         | StockCode   | text    | YES  |     | NULL    |       |
  48.         | Description | text    | YES  |     | NULL    |       |
  49.         | Quantity    | int(11) | YES  |     | NULL    |       |
  50.         | InvoiceDate | text    | YES  |     | NULL    |       |
  51.         | UnitPrice   | double  | YES  |     | NULL    |       |
  52.         | CustomerID  | int(11) | YES  |     | NULL    |       |
  53.         | Country     | text    | YES  |     | NULL    |       |
  54.         +-------------+---------+------+-----+---------+-------+*/
  55.    
  56.     //于hive数据库,ods层中创建表
  57.     spark.sqlContext.sql(
  58.       """
  59.         |create table if not exists clown_test_db.ShopTest_ods_tb_1
  60.         |(
  61.         | InvoiceNo string ,
  62.         | StockCode string ,
  63.         | Description string ,
  64.         | Quantity int ,
  65.         | InvoiceDate string ,
  66.         | UnitPrice double ,
  67.         | CustomerID int ,
  68.         | Country string
  69.         |)
  70.         |partitioned by (country_pid string,customer_pid int)
  71.         |row format delimited
  72.         |fields terminated by '\t'  //本数据中字段值存在','不能用','作为分隔符
  73.         |lines terminated by '\n'
  74.         |stored as textfile
  75.         |""".stripMargin)
  76.       
  77.     //使用sql-insert into 语句将mysql数据全部导入hive表中
  78.     spark.sqlContext.sql(
  79.       """
  80.         |insert into table clown_test_db.ShopTest_ods_tb_1 partition(country_pid,customer_pid)
  81.         |select *,Country,CustomerID from inputTable
  82.         |""".stripMargin)
  83.   }
  84. }
复制代码
2、增量抽取

  1. import java.text.SimpleDateFormat
  2. import org.apache.spark.sql.{SaveMode, SparkSession}
  3. /**
  4. * hive_ods -> hive_dwd 增量抽取
  5.    */
  6.    object ShopTest2 {
  7.   def main(args: Array[String]): Unit = {
  8.     System.setProperty("HADOOP_USER_NAME","root")
  9.     val spark = SparkSession.builder()
  10.       .appName("ShopTest2")
  11.       .master("local[*]")
  12.       .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
  13.       .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
  14.       .enableHiveSupport()
  15.       .getOrCreate()
  16.    
  17.     /*    +-------------+---------+------+-----+---------+-------+
  18.     | Field       | Type    | Null | Key | Default | Extra |
  19.     +-------------+---------+------+-----+---------+-------+
  20.     | InvoiceNo   | text    | YES  |     | NULL    |       |
  21.     | StockCode   | text    | YES  |     | NULL    |       |
  22.     | Description | text    | YES  |     | NULL    |       |
  23.     | Quantity    | int(11) | YES  |     | NULL    |       |
  24.     | InvoiceDate | text    | YES  |     | NULL    |       |
  25.     | UnitPrice   | double  | YES  |     | NULL    |       |
  26.     | CustomerID  | int(11) | YES  |     | NULL    |       |
  27.     | Country     | text    | YES  |     | NULL    |       |
  28.     +-------------+---------+------+-----+---------+-------+*/
  29.         //隐式转换,sql方法导入
  30.     import spark.implicits._
  31.     import org.apache.spark.sql.functions._
  32.    
  33.     spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
  34.     spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
  35.    
  36.     //直接通过sql语句获取到hive ods层中的表数据
  37.     val inputData = spark.sqlContext.sql("select * from clown_test_db.ShopTest_ods_tb_1")
  38.    
  39.     //设置时间条件
  40.     val timeStr = "2011/01/01 00:00"
  41.     val timeTemp = new SimpleDateFormat("yyyy/MM/dd HH:mm").parse(timeStr).getTime//单位为ms
  42.     println(timeTemp)
  43.    
  44.     //未转换前的数据格式为:12/8/2010 9:53
  45.     val timeFormat = inputData
  46.       .withColumn("InvoiceDate",unix_timestamp($"InvoiceDate","MM/dd/yyyy HH:mm"))//时间戳获取,单位为s
  47.       .where(s"InvoiceDate>$timeTemp/1000")//增量条件判断
  48.       .withColumn("InvoiceDate",from_unixtime($"InvoiceDate","yyyy/MM/dd HH:mm"))//时间格式转换
  49.       .where("Country='United Kingdom' or Country = 'Finland'")//筛选出国家名为United Kingdom 或 Finland的数据
  50.    
  51.     //由于该ods层表与目标dwd层表结构相同,直接用like语句创建结构相同的dwd表
  52.     spark.sqlContext.sql(
  53.       """
  54.         |create table if not exists clown_dwd_db.shoptest_dwd_tb_1
  55.         |like clown_test_db.ShopTest_ods_tb_1
  56.         |""".stripMargin)
  57.     //使用sparkSql算子将数据由ods表数据增量抽取到dwd表中
  58.     timeFormat.write.format("hive")
  59.       .mode(SaveMode.Append)
  60.       .insertInto("clown_dwd_db.shoptest_dwd_tb_1")
  61.   }
  62. }
复制代码
3、数据清洗

  1. import org.apache.spark.sql.{SaveMode, SparkSession}
  2. /**
  3. * hive_dwd->hive_dwd 缺失值剔除与填充
  4. */
  5. object ShopTest3 {
  6. /*+-------------+---------+------+-----+---------+-------+
  7.    | Field       | Type    | Null | Key | Default | Extra |
  8.    +-------------+---------+------+-----+---------+-------+
  9.    | InvoiceNo   | text    | YES  |     | NULL    |       |
  10.    | StockCode   | text    | YES  |     | NULL    |       |
  11.    | Description | text    | YES  |     | NULL    |       |
  12.    | Quantity    | int(11) | YES  |     | NULL    |       |
  13.    | InvoiceDate | text    | YES  |     | NULL    |       |
  14.    | UnitPrice   | double  | YES  |     | NULL    |       |
  15.    | CustomerID  | int(11) | YES  |     | NULL    |       |
  16.    | Country     | text    | YES  |     | NULL    |       |
  17.    +-------------+---------+------+-----+---------+-------+*/
  18. def main(args: Array[String]): Unit = {
  19.    System.setProperty("HADOOP_USER_NAME","root")
  20.    val spark = SparkSession.builder()
  21.      .appName("ShopTest3")
  22.      .master("local[*]")
  23.      .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
  24.      .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
  25.      .enableHiveSupport()
  26.      .getOrCreate()
  27.    import spark.implicits._
  28.    import org.apache.spark.sql.functions._
  29.    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
  30.    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
  31.    val data = spark.sqlContext.sql("select * from clown_dwd_db.shoptest_dwd_tb_1")
  32.    spark.sqlContext.sql(
  33.      """
  34.        |create table if not exists clown_dwd_db.shopTest_dwd_tb_3
  35.        |(
  36.        | InvoiceNo string ,
  37.        | StockCode string ,
  38.        | Description string ,
  39.        | Quantity int ,
  40.        | InvoiceDate string ,
  41.        | UnitPrice double ,
  42.        | CustomerID int ,
  43.        | Country string
  44.        |)
  45.        |partitioned by (country_pid string)
  46.        |row format delimited
  47.        |fields terminated by '\t'
  48.        |lines terminated by '\n'
  49.        |stored as textfile
  50.        |""".stripMargin)
  51.   //使用na.fill对缺失值进行填充
  52.   //使用na.drop对缺失值进行剔除
  53.    data.na.fill(
  54.      Map(
  55.        "Country"->"Country_Null",
  56.        "CustomerID"->0
  57.      )
  58.    )
  59.      .na.drop(
  60.      Seq("UnitPrice","Quantity")
  61.    )      .selectExpr("InvoiceNo","StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID","Country","Country")//由于数据中存在分区表字段,且该字段关联数据已改变,需要重新进行赋值
  62.      .limit(10000)
  63.      .write
  64.      .format("hive")
  65.      .mode(SaveMode.Append)
  66.      .insertInto("clown_dwd_db.shopTest_dwd_tb_3")
  67.   }
  68. }
复制代码
4、指标计算

  1. import org.apache.spark.sql.SparkSession
  2. /**
  3.   * sparkSql算子实现指标计算
  4.   */
  5. object ShopTest4 {
  6.   /*    +-------------+---------+------+-----+---------+-------+
  7. | Field       | Type    | Null | Key | Default | Extra |
  8. +-------------+---------+------+-----+---------+-------+
  9. | InvoiceNo   | text    | YES  |     | NULL    |       |
  10. | StockCode   | text    | YES  |     | NULL    |       |
  11. | Description | text    | YES  |     | NULL    |       |
  12. | Quantity    | int(11) | YES  |     | NULL    |       |
  13. | InvoiceDate | text    | YES  |     | NULL    |       |
  14. | UnitPrice   | double  | YES  |     | NULL    |       |
  15. | CustomerID  | int(11) | YES  |     | NULL    |       |
  16. | Country     | text    | YES  |     | NULL    |       |
  17. +-------------+---------+------+-----+---------+-------+*/
  18.   def main(args: Array[String]): Unit = {
  19.     System.setProperty("HADOOP_USER_NAME", "root")
  20.    
  21.     val spark = SparkSession.builder()
  22.       .appName("ShopTest4")
  23.       .master("local[*]")
  24.       .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
  25.       .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
  26.       .enableHiveSupport()
  27.       .getOrCreate()
  28.    
  29.     import spark.implicits._
  30.     import org.apache.spark.sql.functions._
  31.    
  32.     spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
  33.     spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
  34.    
  35.     val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
  36.    
  37.     /**
  38.      * 统计每个国家的客户数,输出结果。
  39.      * 排序后输出客户最多的10个国家
  40.      */
  41.    
  42.         data.dropDuplicates("CustomerID","Country")//去重
  43.           .withColumn("x",lit(1))//添加一列数据都为1
  44.           .groupBy("Country")//聚合国家字段
  45.           .sum("x")//对1数据字段进行累加
  46.           .show(20)
  47.    
  48.     /**
  49.      * 统计各个国家的总销售额分布情况
  50.      */
  51.    
  52.     data.withColumn("x", $"Quantity" * $"UnitPrice")//添加销售额字段,值为数量*单价
  53.       .groupBy("Country")//聚合国家字段
  54.       .sum("x")//计算总销售额
  55.       .withColumn("sum(x)", round($"sum(x)", 2))//对结果字段进行四舍五入到两位,但round会对最后一位0省略,最好使用其他函数
  56.     /*若题目要求输出格式可进行rdd转换
  57.       .rdd
  58.       .map(x=>x.mkString(","))
  59.       .foreach(println(_))
  60.     */
  61.     /**
  62.      * 统计每种商品的销量,输出结果
  63.      * 排序后输出销量最高的10种商品
  64.      */
  65.     data.groupBy("StockCode")//聚合商品编码字段
  66.       .sum("Quantity")//计算销量
  67.       .coalesce(1)//将spark分区设置为1,防止后面排序混乱
  68.       .orderBy(desc("sum(Quantity)"))//由大到小排序
  69.       .show(10)
  70.    
  71.     /**
  72.      * 统计月销售额随时间的变化趋势
  73.      * [月份,销售额]
  74.      */
  75.     data.withColumn("InvoiceDate",substring_index($"InvoiceDate","/",2))//由于数据在增量抽取阶段已进行时间格式转换,可直接进行切割得出 年份/月份 的格式,substring_index与split不同
  76.       .withColumn("x",$"Quantity"*$"UnitPrice")//计算销售额
  77.       .groupBy("InvoiceDate")//对月份进行聚合
  78.       .sum("x")//计算总销售额
  79.       .coalesce(1)//设置spark分区为1
  80.       .orderBy(desc("InvoiceDate"))//由大到小排序
  81.       .withColumn("sum(x)",round($"sum(x)",2))//四舍五入到2位
  82.       .show(100)
  83.    
  84.     /**
  85.      * 统计商品描述中,排名前300(Top300)的热门关键词
  86.      */
  87.     data.select(col("Description"))//商品将描述字段单独查询
  88.       .flatMap(x=>x.toString().split("\\W"))//进行flatMap 切割后展平,切割\\W为正则匹配模式,匹配所有符号
  89.       .withColumn("x",lit(1))//增加1的数据列
  90.       .groupBy("value")//展平后字段名为value,进行聚合
  91.       .sum("x")//累加1数据
  92.       .where("value != '' ")//筛除空白数据
  93.       .coalesce(1)//设置spark分区为1
  94.       .orderBy(desc("sum(x)"))//由大到小排序
  95.       .show(300)//展示300条
  96.   }
  97. }
复制代码
  1. import org.apache.spark.sql.SparkSession
  2. /**
  3.   *  sql语句实现指标计算
  4.   */
  5. object ShopTest5 {
  6.   /*    +-------------+---------+------+-----+---------+-------+
  7. | Field       | Type    | Null | Key | Default | Extra |
  8. +-------------+---------+------+-----+---------+-------+
  9. | InvoiceNo   | text    | YES  |     | NULL    |       |
  10. | StockCode   | text    | YES  |     | NULL    |       |
  11. | Description | text    | YES  |     | NULL    |       |
  12. | Quantity    | int(11) | YES  |     | NULL    |       |
  13. | InvoiceDate | text    | YES  |     | NULL    |       |
  14. | UnitPrice   | double  | YES  |     | NULL    |       |
  15. | CustomerID  | int(11) | YES  |     | NULL    |       |
  16. | Country     | text    | YES  |     | NULL    |       |
  17. +-------------+---------+------+-----+---------+-------+*/
  18.   def main(args: Array[String]): Unit = {
  19.     System.setProperty("HADOOP_USER_NAME", "root")
  20.    
  21.     val spark = SparkSession.builder()
  22.       .appName("ShopTest5")
  23.       .master("local[*]")
  24.       .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
  25.       .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
  26.       .enableHiveSupport()
  27.       .getOrCreate()
  28.    
  29.     import spark.implicits._
  30.     import org.apache.spark.sql.functions._
  31.    
  32.     spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
  33.     spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
  34.    
  35.     val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
  36.    
  37.     data.createOrReplaceTempView("dataTable")
  38.    
  39.     /**
  40.      * 统计每个国家的客户数,输出结果。
  41.      * 排序后输出客户最多的10个国家
  42.      */
  43.    //对去重后的Country,CustomerID进行聚合统计即可得出各个国家的客户数
  44.    spark.sqlContext.sql(
  45.      """
  46.        |select Country,count(distinct Country,CustomerID) from dataTable group by Country
  47.        |""".stripMargin)
  48.      .show()
  49.     /**
  50.      * 统计各个国家的总销售额分布情况
  51.      */
  52.    
  53.     spark.sqlContext.sql(
  54.       """
  55.         |select Country ,round(sum(Quantity*UnitPrice),2)
  56.         |from dataTable
  57.         |group by Country
  58.         |""".stripMargin)
  59.       .show()
  60.    
  61.     /**
  62.      * 统计每种商品的销量,输出结果
  63.      * 排序后输出销量最高的10种商品
  64.      */
  65.    
  66.     spark.sqlContext.sql(
  67.       """
  68.         |select StockCode,round(sum(Quantity*UnitPrice),2) as xl
  69.         |from dataTable
  70.         |group by StockCode
  71.         |order by xl desc
  72.         |""".stripMargin)
  73.       .show(10)
  74.    
  75.     /**
  76.      * 统计月销售额随时间的变化趋势
  77.      * [月份,销售额]
  78.      */
  79.    
  80.     //group by执行优先度可能高于 as 重命名,因此as后的名字无法用于group by 聚合
  81.     spark.sqlContext.sql(
  82.       """
  83.         |select substring_index(InvoiceDate,"/",2) as time,round(sum(Quantity*UnitPrice),2) as sum
  84.         |from dataTable
  85.         |group by substring_index(InvoiceDate,"/",2)
  86.         |order by substring_index(InvoiceDate,"/",2)
  87.         |""".stripMargin)
  88.       .show()
  89.    
  90.     /**
  91.      * 统计商品描述中,排名前300(Top300)的热门关键词
  92.      */
  93.    
  94.     //目前认为该题用sql解法没有必要
  95.       //- -
  96.   }
  97. }
复制代码
四、其他

1、hive分区的增删改查

参考资料:(15条消息) HIve学习:Hive分区修改_u011047968的专栏-CSDN博客_hive修改分区
hive表新增分区:[]内的不必要
  1. alter table tb_name add partition (pid1 = ‘’,pid2 = ) [location ‘xxx’]
复制代码
多个分区
  1. alter table tb_name add partition (pid1 = ‘’,pid2 = ) partition (pid1 = ‘’,pid2 = ) [location ‘xxx’]
复制代码
hive表修改分区:
  1. alter table tb_name partition(pid1='') rename to partition(pid1='');/*修改分区名*/
  2. alter table tb_name partition(pid1='') set location 'hdfs://master:8020/....';/*修改分区路径,注意使用绝对路径*/  
  3. alter table tb_name partition column (pid1 string);/*修改分区字段数据类型*/
复制代码
hive表删除分区:
  1. alter table tb_name drop partition (pid1 = ‘’,pid2 = )[ partition (pid1 = ‘’,pid2 = )…]
复制代码
hive分区值查询:
  1. show partitions tb_name;
复制代码
2、spark打包运行

命令:
spark-submit --class ShopTest4 --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar
若使用了jdbc连接,需要指明驱动jar包 mysql-connector-java-5.1.48.jar
spark-submit --jars mysql-connector-java-5.1.48.jar --class ShopTest --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar
或者将mysql驱动放至 $‘spark_home’/jars 目录下

3、时间格式

时间模式字符串用来指定时间格式。在此模式中,所有的 ASCII 字母被保留为模式字母,定义如下:
字母描述示例G纪元标记ADy四位年份2001M月份July or 07d一个月的日期10hA.M./P.M. (1~12)格式小时12H一天中的小时 (0~23)22m分钟数30s秒数55S毫秒数234E星期几TuesdayD一年中的日子360F一个月中第几周的周几2 (second Wed. in July)w一年中第几周40W一个月中第几周1aA.M./P.M. 标记PMk一天中的小时(1~24)24KA.M./P.M. (0~11)格式小时10z时区Eastern Standard Time’文字定界符Delimiter"单引号`4、Scala正则表达式

Scala 的正则表达式继承了 Java 的语法规则,Java 则大部分使用了 Perl 语言的规则。
下表我们给出了常用的一些正则表达式规则:(注意:\需要转义,算子中写为\,sql语句中写为\\\)
表达式匹配规则^匹配输入字符串开始的位置。$匹配输入字符串结尾的位置。.匹配除"\r\n"之外的任何单个字符。[…]字符集。匹配包含的任一字符。例如,"[abc]“匹配"plain"中的"a”。[^…]反向字符集。匹配未包含的任何字符。例如,"[^abc]“匹配"plain"中"p”,“l”,“i”,“n”。\A匹配输入字符串开始的位置(无多行支持)\z字符串结尾(类似$,但不受处理多行选项的影响)\Z字符串结尾或行尾(不受处理多行选项的影响)re*重复零次或更多次re+重复一次或更多次re?重复零次或一次re{ n}重复n次re{ n,}re{ n, m}重复n到m次a|b匹配 a 或者 b(re)匹配 re,并捕获文本到自动命名的组里(?: re)匹配 re,不捕获匹配的文本,也不给此分组分配组号(?> re)贪婪子表达式\w匹配字母或数字或下划线或汉字\W匹配任意不是字母,数字,下划线,汉字的字符\s匹配任意的空白符,相等于 [\t\n\r\f]\S匹配任意不是空白符的字符\d匹配数字,类似 [0-9]\D匹配任意非数字的字符\G当前搜索的开头\n换行符\b通常是单词分界位置,但如果在字符类里使用代表退格\B匹配不是单词开头或结束的位置\t制表符\Q开始引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。\E结束引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。
正则表达式实例
实例描述.匹配除"\r\n"之外的任何单个字符。[Rr]uby匹配 “Ruby” 或 “ruby”rub[ye]匹配 “ruby” 或 “rube”[aeiou]匹配小写字母 :aeiou[0-9]匹配任何数字,类似 [0123456789][a-z]匹配任何 ASCII 小写字母[A-Z]匹配任何 ASCII 大写字母[a-zA-Z0-9]匹配数字,大小写字母[^aeiou]匹配除了 aeiou 其他字符[^0-9]匹配除了数字的其他字符\d匹配数字,类似: [0-9]\D匹配非数字,类似: [^0-9]\s匹配空格,类似: [ \t\r\n\f]\S匹配非空格,类似: [^ \t\r\n\f]\w匹配字母,数字,下划线,类似: [A-Za-z0-9_]\W匹配非字母,数字,下划线,类似: [^A-Za-z0-9_]ruby?匹配 “rub” 或 “ruby”: y 是可选的ruby*匹配 “rub” 加上 0 个或多个的 y。ruby+匹配 “rub” 加上 1 个或多个的 y。\d{3}刚好匹配 3 个数字。\d{3,}匹配 3 个或多个数字。\d{3,5}匹配 3 个、4 个或 5 个数字。\D\d+无分组: + 重复 \d(\D\d)+/分组: + 重复 \D\d 对([Rr]uby(, )?)+匹配 “Ruby”、“Ruby, ruby, ruby”,等等常用可以应用正则的函数:
.split("")切割字符串
.regexp_extract(string subject, string pattern, int index) 将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符
.regexp_replace(string A, string B, string C) 将字符串A中的符合Java正则表达式B的部分替换为C
.equals("")匹配
5、SQL like与rlike

like为通配符匹配,不是正则
%:匹配零个及多个任意字符
_:与任意单字符匹配
[]:匹配一个范围
[^]:排除一个范围
rlike为正则匹配
regexp与rlike功能相似
参考资料:(15条消息) sparksql 正则匹配总结_Andrew LD-CSDN博客_spark 正则表达式
6、中文数据

关于csv文件若包含中文,可在读取时设置option参数
  1. /**
  2. * 注意option的设置
  3. * 读取本地文件需要加上file:///否则默认读hdfs文件
  4. */
  5. val inputData = spark.sqlContext.read.format("csv")
  6.   .option("sep","\t")
  7.   .option("encoding","GBK")
  8.   .option("header","true")
  9. .load("file:///C:\\Users\\61907\\Desktop\\BigData\\Spark\\sparkDome1\\HiveAndMysql\\src\\main\\resources\\cov19.csv")
复制代码
jdbc读取数据库数据时,若有中文需设置jdbc连接参数
&useUnicode=true&characterEncoding=utf8
  1. //    jdbc中文编码设置
  2.     val mysqlMap = Map(
  3.       "url"->"jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
  4.       "user"->"root",
  5.       "password"->"xxx",
  6.       "driver"->"com.mysql.jdbc.Driver"
  7.     )
复制代码
关于hive中存储中文数据,中文注释,中文分区(索引)
Ⅰ~Ⅲ参考资料:
(16条消息) hive设置中文编码格式utf-8_2020xyz的博客-CSDN博客_hive建表指定编码格式
(16条消息) hive修改使用utf8编码支持中文字符集_那又怎样?的博客-CSDN博客_hive默认字符集编码
Ⅰ.元数据库设置

元数据库需设置为utf-8编码
  1. ##创建hive元数据库hive,并指定utf-8编码格式
  2. mysql>create database hive DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
  3. ##修改已存在的hive元数据库,字符编码格式为utf-8
  4. mysql>alter database hive character set utf8;     
  5. ##进入hive元数据库
  6. mysql>use hive;
  7. ##查看元数据库字符编码格式
  8. mysql>show variables like 'character_set_database';  
复制代码

Ⅱ.相关表设置

1).修改字段注释字符集
  1. mysql>alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
复制代码

2).修改表注释字符集
  1. mysql>alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
复制代码

类似的,PARAM_KEY若需要中文也可设置为utf8

3).修改分区表参数,以支持分区能够用中文表示
  1. mysql>alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
  2. mysql>alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
复制代码


另外,PARTITIONS表中存放分区名的字段也需要修改为utf8
  1. mysql>alter table PARTITIONS modify column PART_name varchar(4000) character set utf8;
复制代码

4).修改索引注解
  1. mysql>alter table INDEX_PARAMS modify column PARAM_VALUE varchar(250) character set utf8;
复制代码

Ⅲ.hive-site.xml配置文件设置

需要在jdbc连接中设置支持中文编码
&useSSL=false&useUnicode=true&characterEncoding=utf8
其中&需要使用&转义
参考资料:(16条消息) 【已解决】The reference to entity “useSSL” must end with the ‘;’ delimiter_清宵尚温的博客-CSDN博客
/opt/hive/conf/hive-site.xml:
  1. <?xml version="1.0" encoding="UTF-8" standalone="no"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4.   <property>
  5.     <name>javax.jdo.option.ConnectionURL</name>
  6.     <value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8</value>
  7.     <description>hive的元数据库 </description>
  8.   </property>
  9.   <property>
  10.     <name>javax.jdo.option.ConnectionDriverName</name>
  11.     <value>com.mysql.jdbc.Driver</value>
  12.     <description>mysql的驱动jar包 </description>
  13.   </property>
  14.   <property>
  15.     <name>javax.jdo.option.ConnectionUserName</name>
  16.     <value>root</value>
  17.     <description>设定数据库的用户名 </description>
  18.   </property>
  19.   <property>
  20.     <name>javax.jdo.option.ConnectionPassword</name>
  21.     <value>xxx</value>
  22.     <description>设定数据库的密码</description>
  23.    </property>
  24. <!--zbt添加-->
  25.    <property>
  26.       <name>hive.exec.max.dynamic.partitions</name>
  27.       <value>100000</value>
  28.       <description>在所有执行MR的节点上,最大一共可以创建多少个动态分区</description>
  29.    </property>
  30.    <property>
  31.       <name>hive.exec.max.dynamic.partitions.pernode</name>
  32.       <value>100000</value>
  33.       <description>在所有执行MR的节点上,最大可以创建多少个动态分区</description>
  34.   </property>
  35. </configuration>
复制代码
Ⅳ.未解决问题

hdfs文件系统中显示

虽然正常显示中文但在文件夹中会出现
Path does not exist on HDFS or WebHDFS is disabled. Please check your path or enable WebHDFS

可能是中文路径导致的错误,但该错误目前未影响到分区表的正常操作,具体影响仍需实验。
Ⅴ.暴力脚本- -

参考资料:(16条消息) hive分区字段含中文导致的报错_一定要努力努力再努力的博客-CSDN博客_hive分区字段是中文
  1. alter database hive_meta default character set utf8;
  2. alter table BUCKETING_COLS default character set utf8;
  3. alter table CDS default character set utf8;
  4. alter table COLUMNS_V2 default character set utf8;
  5. alter table DATABASE_PARAMS default character set utf8;
  6. alter table DBS default character set utf8;
  7. alter table FUNCS default character set utf8;
  8. alter table FUNC_RU default character set utf8;
  9. alter table GLOBAL_PRIVS default character set utf8;
  10. alter table PARTITIONS default character set utf8;
  11. alter table PARTITION_KEYS default character set utf8;
  12. alter table PARTITION_KEY_VALS default character set utf8;
  13. alter table PARTITION_PARAMS default character set utf8;
  14. -- alter table PART_COL_STATS default character set utf8;
  15. alter table ROLES default character set utf8;
  16. alter table SDS default character set utf8;
  17. alter table SD_PARAMS default character set utf8;
  18. alter table SEQUENCE_TABLE default character set utf8;
  19. alter table SERDES default character set utf8;
  20. alter table SERDE_PARAMS default character set utf8;
  21. alter table SKEWED_COL_NAMES default character set utf8;
  22. alter table SKEWED_COL_VALUE_LOC_MAP default character set utf8;
  23. alter table SKEWED_STRING_LIST default character set utf8;
  24. alter table SKEWED_STRING_LIST_VALUES default character set utf8;
  25. alter table SKEWED_VALUES default character set utf8;
  26. alter table SORT_COLS default character set utf8;
  27. alter table TABLE_PARAMS default character set utf8;
  28. alter table TAB_COL_STATS default character set utf8;
  29. alter table TBLS default character set utf8;
  30. alter table VERSION default character set utf8;
  31. alter table BUCKETING_COLS convert to character set utf8;
  32. alter table CDS convert to character set utf8;
  33. alter table COLUMNS_V2 convert to character set utf8;
  34. alter table DATABASE_PARAMS convert to character set utf8;
  35. alter table DBS convert to character set utf8;
  36. alter table FUNCS convert to character set utf8;
  37. alter table FUNC_RU convert to character set utf8;
  38. alter table GLOBAL_PRIVS convert to character set utf8;
  39. alter table PARTITIONS convert to character set utf8;
  40. alter table PARTITION_KEYS convert to character set utf8;
  41. alter table PARTITION_KEY_VALS convert to character set utf8;
  42. alter table PARTITION_PARAMS convert to character set utf8;
  43. -- alter table PART_COL_STATS convert to character set utf8;
  44. alter table ROLES convert to character set utf8;
  45. alter table SDS convert to character set utf8;
  46. alter table SD_PARAMS convert to character set utf8;
  47. alter table SEQUENCE_TABLE convert to character set utf8;
  48. alter table SERDES convert to character set utf8;
  49. alter table SERDE_PARAMS convert to character set utf8;
  50. alter table SKEWED_COL_NAMES convert to character set utf8;
  51. alter table SKEWED_COL_VALUE_LOC_MAP convert to character set utf8;
  52. alter table SKEWED_STRING_LIST convert to character set utf8;
  53. alter table SKEWED_STRING_LIST_VALUES convert to character set utf8;
  54. alter table SKEWED_VALUES convert to character set utf8;
  55. alter table SORT_COLS convert to character set utf8;
  56. alter table TABLE_PARAMS convert to character set utf8;
  57. alter table TAB_COL_STATS convert to character set utf8;
  58. alter table TBLS convert to character set utf8;
  59. alter table VERSION convert to character set utf8;
  60. -- alter table PART_COL_STATS convert to character set utf8;
  61. SET character_set_client = utf8 ;
  62. -- SET character_set_connection = utf8 ;
  63. -- alter table PART_COL_STATS convert to character set utf8;
  64. SET character_set_database = utf8 ;
  65. SET character_set_results = utf8 ;
  66. SET character_set_server = utf8 ;
  67. -- SET collation_connection = utf8 ;
  68. -- SET collation_database = utf8 ;
  69. -- SET collation_server = utf8 ;
  70. SET NAMES 'utf8';
复制代码
只复制了博客中修改表字段的部分
看看就好,最好还是根据需求修改。
Ⅵ.实例

  1. import org.apache.spark.sql.{SaveMode, SparkSession}
  2. object CNHivePartitionTest {
  3.   def main(args: Array[String]): Unit = {
  4.     System.setProperty("hadoop.home.dir", "D:\\BaiduNetdiskDownload\\hadoop-2.7.3")
  5.     System.setProperty("HADOOP_USER_NAME", "root")
  6.     val spark = SparkSession.builder()
  7.       .appName("Cov19DataDome4")
  8.       .master("local[*]")
  9.       .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
  10.       .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
  11.       .enableHiveSupport()
  12.       .getOrCreate()
  13.     import spark.implicits._
  14.     import org.apache.spark.sql.functions._
  15.     spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
  16.     spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
  17.     val mysqlMap = Map(
  18.       "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
  19.       "user" -> "root",
  20.       "password" -> "xxx",
  21.       "driver" -> "com.mysql.jdbc.Driver"
  22.     )
  23.     val mysqlData = spark.read.format("jdbc")
  24.       .options(mysqlMap)
  25.       .option("dbtable","tc_hotel2")
  26.       .load()
  27.     spark.sqlContext.sql(
  28.       """
  29.         |create table if not exists clown_test_db.CNTest
  30.         |(
  31.         |  `hname` string,
  32.         |  `hbrand` string,
  33.         |  `province` string,
  34.         |  `city` string,
  35.         |  `starlevel` string,
  36.         |  `rating` string,
  37.         |  `comment_count` string,
  38.         |  `price` string
  39.         |)
  40.         |partitioned by (pid string)
  41.         |row format delimited
  42.         |fields terminated by '\t'
  43.         |lines terminated by '\n'
  44.         |stored as textfile
  45.         |""".stripMargin)
  46.     mysqlData
  47.       .select(col("*"),col("province"))
  48.       .write
  49.       .format("hive")
  50.       .mode(SaveMode.Append)
  51.       .insertInto("clown_test_db.CNTest")
  52.   }
  53. }
复制代码
7、表连接join/union

参考资料:https://blog.caogenba.net/m0_37809146/article/details/91282446

  1. val tb1 = spark.read.format("jdbc")
  2.   .options(mysqlMap)
  3.   .option("dbtable", "cov19_test_tb")
  4.   .load()
  5. val tb2 = spark.read.format("jdbc")
  6.   .options(mysqlMap)
  7.   .option("dbtable", "cov19_test_tb_2")
  8.   .load()
  9.   .withColumnRenamed("", "")
  10. /**
  11. * inner 交集,只会联合给出字段都存在的数据
  12. */
  13. tb1.join(tb2, Seq("provinceName", "cityName"), "inner")
  14. //      .show(100)
  15. /**
  16. * right 右链接,展示右边表所有数据
  17. */
  18. tb1.join(tb2, Seq("provinceName", "cityName"), "right")
  19. //      .show(100)
  20. /**
  21. * left 左链接,展示左边表所有数据
  22. */
  23. tb1.join(tb2, Seq("provinceName", "cityName"), "left")
  24. //      .show(100)
  25. val testTb1 = tb1.withColumnRenamed("cityName", "tb1CN")
  26. val testTb2 = tb2.withColumnRenamed("cityName", "tb1CN")
  27. //默认 inner连接,进行连接的条件字段必须两边表都存在
  28. testTb1.join(testTb2, "tb1CN")
  29. //      .show()
  30. /**
  31. * right_outer 右外连接,相当于左连接
  32. */
  33. tb1.join(tb2, Seq("provinceName", "cityName"), "right_outer")
  34. //      .show(100)
  35. /**
  36. * left_outer 左外连接,相当于右连接
  37. */
  38. tb1.join(tb2, Seq("provinceName", "cityName"), "left_outer")
  39. //      .show(100)
  40. /**
  41. * 外连接 类似把左右连接出的集合加起来- -
  42. */
  43. tb1.join(tb2, Seq("provinceName", "cityName"), "outer")
  44. //      .show(100)
  45. /**
  46. * 全连接
  47. */
  48. tb1.join(tb2, Seq("provinceName", "cityName"), "full")
  49. //      .show(100)
  50. /**
  51. * 全外连接
  52. */
  53. tb1.join(tb2, Seq("provinceName", "cityName"), "full_outer")
  54. //      .show(100)
  55. /**
  56. * 交集
  57. */
  58. tb1.join(tb2, Seq("provinceName", "cityName"), "left_semi")
  59.   .show(100)
  60. /**
  61. * 差集
  62. */
  63. tb1.join(tb2, Seq("provinceName", "cityName"), "left_anti")
  64.   .show(100)
  65. /**
  66. * https://blog.caogenba.net/wcc27857285/article/details/86439313
  67. * 其他知识点:
  68. * HAVING 子句
  69. * 在 SQL 中增加 HAVING 子句原因是,WHERE 关键字无法与聚合函数一起使用。
  70. *
  71. * SQL HAVING 语法
  72. * SELECT column_name, aggregate_function(column_name)
  73. * FROM table_name
  74. * WHERE column_name operator value
  75. * GROUP BY column_name
  76. * HAVING aggregate_function(column_name) operator value
  77. *
  78. *
  79. * --- JOIN ON
  80. * JOIN写连接字段
  81. * ON写匹配条件
  82. *
  83. */
复制代码
8、自定义UDF,UDAF函数

Spark 2.4.0编程指南–Spark SQL UDF和UDAF-阿里云开发者社区 (aliyun.com)
(17条消息) Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator_L-CSDN博客
(17条消息) UDF和UDAF,UDTF的区别_山海-CSDN博客_udf和udtf区别
[(17条消息) Spark] 自定义函数 udf & pandas_udf_風の唄を聴け的博客-CSDN博客_pandas spark udf
9、数据集获取

UCI机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/
Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Craw网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/
Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从
Competitions区域下载:http://www.kaggle.com/competitions
KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html
10、数仓分层概念

参考资料:(10条消息) 数据仓库–数据分层(ETL、ODS、DW、APP、DIM)_hello_java_lcl的博客-CSDN博客_dim层

五、实战复盘

1、2022/1/3

题目:


数据源:

csv文件(未修改)

mysql表格(增加脏数据)

环境准备:

1.mysql数据表格 2.hive目标表 3.pom文件
完成速度:

3h+
遇到问题:

1.data->mysql,数据保存

SaveMode.Overwrite 保存至mysql数据库,不仅会覆盖数据格式,字段名也会被覆盖
在做题途中遇到了保存SaveMode.Append失败的错误,修改为Overwrite 不报错,原因不明
是否解决:
出现错误
  1. <code>Unknown column 'sum' in 'field list'
复制代码
原因是字段名与mysql数据库目标表中的字段名不同
修改字段名相同即可
  1. .withColumnRenamed("sum","total_price")
复制代码
在hive中是否有相同特性?
2.Join等表连接的使用

Join,union仍不熟悉 select子查询也比较生疏
是否解决: ✔?
join理解下图足够

union联合要求字段相同 否则报错
3.Date计算

参考资料:https://blog.caogenba.net/wybshyy/article/details/52064337

使用datediff不需要转换时间格式
是否解决:
参考资料:
(18条消息) Spark-SQL常用内置日期时间函数_绿萝蔓蔓绕枝生-CSDN博客_sparksql 时间函数
(18条消息) sparksql 时间函数_OH LEI``-CSDN博客_sparksql时间函数
datediff 计算两个时间差天数 结果返回一个整数
对时间格式可能有要求例如‘2021/1/4‘这样的时间格式无法被计算(sql中,算子貌似没有这个问题)
sql写法:
  1. spark.sql(
  2.   """
  3.     |select datediff('2021-1-4','2020-12-30')
  4.     |""".stripMargin).show()
复制代码
算子写法:
  1. .withColumn("o",datediff(col("delivery_date"),col("order_date")))
复制代码
months_between计算两个时间差月数 结果返回一个浮点数
sql写法:
  1. spark.sql(
  2.   """
  3.     |select months_between('2021-1-4','2020-12-30')
  4.     |""".stripMargin).show()
复制代码
返回:0.16129032
若想返回整数月份可以将天数删除:
  1. spark.sql(
  2.   """
  3.     |select months_between('2021-1','2020-12')
  4.     |""".stripMargin).show()
复制代码
返回:1.0
算子写法:
  1. .withColumn("o",months_between(col("delivery_date"),col("order_date")))
复制代码
直接用时间戳相减通过计算也可以
  1. spark.sql(
  2.   """
  3.     |select (unix_timestamp('2022/1/1','yyyy/MM/dd') - unix_timestamp('2021/12/31','yyyy/MM/dd'))/60/60/24
  4.     |""".stripMargin).show()
复制代码
来源:https://blog.caogenba.net/Clown_34/article/details/122421267
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作