gzyueqian
13352868059

DataFrame概述和使用-粤嵌教育

更新时间: 2018-09-06 18:30:26来源: java大数据浏览量:6150

一、 概述:
DataFrame是一个分布式数据集,可以理解为关系型数据库一张表,由字段和字段类型、字段值按列组织,且支持四种语言,在Scala API中可以理解为: FataFrame=Dataset[ROW]
注:DataFrame产生于V1.3之后,在V1.3前为SchemaRDD,在V1.6以后又添加了Dataset

二、DataFrame vs RDD 差异:

<br>
<span style="font-size: 18px;">
    概念  :
</span>
<span style="font-size: 18px;">
两个都是分布式容器,DF理解是一个表格除了RDD数据以外还有Schema,也支持复杂数据类型(map..)
<br>
    API :
</span>
<span style="font-size: 18px;">
DataFrame提供的API比RDD丰富 支持map  filter  flatMap .....
<br>
    数据结构:RDD知道类型没有结构, DF提供Schema信息 有利于优化,性能上好
<br>
    底层  :基于运行环境不一样,RDD开发的Java/Scala API运行底层环境JVM,
<br>
</span>
<span style="font-size: 18px;">
DF在SparkSQL中转换成逻辑执行计划(locaical Plan)和物理执行计划(Physical Plan)中间自身优化功能,性能差异大
<br>
</span>


三、json文件操作

[hadoop@hadoop001 bin]$./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.34-bin.jar 
-- 读取json文件
scala>val df = spark.read.json("file:///home/hadoop/data/people.json")
18/09/02 11:47:20 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
-- 打印schema信息

scala> df.printSchema
<span style="font-size: 18px;">
root
<br>
 |-- age: long (nullable = true)    -- 字段 类型 允许为空
<br>
 |-- name: string (nullable = true)
<br>
</span>
--  打印字段内容

scala> df.show
<span style="font-size: 18px;">
+----+-------+
<br>
| age|   name|
<br>
+----+-------+
<br>
|null|Michael|
<br>
|  30|   Andy|
<br>
|  19| Justin|
<br>
+----+-------+
<br>
</span>

-- 打印查询字段

<span style="font-size: 18px;">


+-------+
<br>
|   name|
<br>
+-------+
<br>
|Michael| 
<br>
|   Andy|
<br>
| Justin|
<br>
+-------+
<br>
</span>
-- 单引号,存在隐式转换

scala> df.select('name).show
<span style="font-size: 18px;">
+-------+
<br>
|   name|
<br>
+-------+
<br>
|Michael|
<br>
|   Andy|
<br>
| Justin|
<br>
+-------+
<br>
</span>

-- 双引号隐式转换不识别
scala> df.select("name).show
<console>:1: error: unclosed string literal
df.select("name).show
          ^
-- 年龄计算,NULL无法计算

scala> df.select($"name",$"age" + 1).show
<span style="font-size: 18px;">
+-------+---------+
<br>
|   name|(age + 1)|
<br>
+-------+---------+
<br>
|Michael|     null|
<br>
|   Andy|       31|
<br>
| Justin|       20|
<br>
+-------+---------+
<br>
</span>
-- 年龄过滤

scala> df.filter($"age" > 21).show
<span style="font-size: 18px;">
+---+----+
<br>
|age|name|
<br>
+---+----+
<br>
| 30|Andy|
<br>
+---+----+
<br>
</span>
-- 年龄分组 汇总

scala> df.groupBy("age").count.show
<span style="font-size: 18px;">
+----+-----+                                                                    
<br>
| age|count|
<br>
+----+-----+
<br>
|  19|    1|
<br>
|null|    1|
<br>
|  30|    1|
<br>
+----+-----+
<br>
</span>
-- 创建一个临时视图
scala>  df.createOrReplaceTempView("people")

scala>spark.sql("select * from people").show
<span style="font-size: 18px;">
+----+-------+
<br>
| age|   name|
<br>
+----+-------+
<br>
|null|Michael|
<br>
|  30|   Andy|
<br>
|  19| Justin|
<br>
+----+-------+
<br>
</span>


四、DataFrame对象上Action操作

-- 定义case class 用来创建Schema 
case class Student(id:String,name:String,phone:String,Email:String)
-- RDD与DF反射方式实现
val students = sc.textFile("file:///home/hadoop/data/student.data").map(_.split("\|")).map(x=>Student(x(0),x(1),x(2),x(3))).toDF()
-- 打印DF信息
students.printSchema
-- show(numRows: Int, truncate: Boolean) 
-- numRows截取前20行和truncate读取前20字符串
-- students.show(5,false) 读取前五行和所有字符串
scala> students.show

<span style="font-size: 18px;">
+---+--------+--------------+--------------------+
<br>
| id|    name|         phone|               Email|
<br>
+---+--------+--------------+--------------------+
<br>
|  1|   Burke|1-300-746-8446|ullamcorper.velit...|
<br>
|  2|   Kamal|1-668-571-5046|pede.Suspendisse@...|
<br>
|  3|    Olga|1-956-311-1686|Aenean.eget.metus...|
<br>
|  4|   Belle|1-246-894-6340|vitae.aliquet.nec...|
<br>
|  5|  Trevor|1-300-527-4967|dapibus.id@acturp...|
<br>
|  6|  Laurel|1-691-379-9921|adipiscing@consec...|
<br>
|  7|    Sara|1-608-140-1995|Donec.nibh@enimEt...|
<br>
|  8|  Kaseem|1-881-586-2689|cursus.et.magna@e...|
<br>
|  9|     Lev|1-916-367-5608|Vivamus.nisi@ipsu...|
<br>
| 10|    Maya|1-271-683-2698|accumsan.convalli...|
<br>
| 11|     Emi|1-467-270-1337|est@nunc.com|.......|
<br>
| 12|   Caleb|1-683-212-0896|Suspendisse@Quisq...|
<br>
| 13|Florence|1-603-575-2444|sit.amet.dapibus@...|
<br>
| 14|   Anika|1-856-828-7883|euismod@ligulaeli...|
<br>
| 15|   Tarik|1-398-171-2268|turpis@felisorci.com|
<br>
| 16|   Amena|1-878-250-3129|lorem.luctus.ut@s...|
<br>
| 17| Blossom|1-154-406-9596|Nunc.commodo.auct...|
<br>
| 18|     Guy|1-869-521-3230|senectus.et.netus...|
<br>
| 19| Malachi|1-608-637-2772|Proin.mi.Aliquam@...|
<br>
| 20|  Edward|1-711-710-6552|lectus@aliquetlib...|
<br>
+---+--------+--------------+--------------------+
<br>
only showing top 20 rows
<br>

</span>

-- students.head(5) 返回前几行数据


<span style="font-size: 18px;">
scala> students.head(5).foreach(println)
<br>
[1,Burke,1-300-746-8446,ullamcorper.velit.in@ametnullaDonec.co.uk]
<br>
[2,Kamal,1-668-571-5046,pede.Suspendisse@interdumenim.edu]
<br>
[3,Olga,1-956-311-1686,Aenean.eget.metus@dictumcursusNunc.edu]
<br>
[4,Belle,1-246-894-6340,vitae.aliquet.nec@neque.co.uk]
<br>
[5,Trevor,1-300-527-4967,dapibus.id@acturpisegestas.net]
<br>
</span>
-- 查询具体字段

<span style="font-size: 18px;">
scala> students.select("id","name").show(5)
<br>
+---+------+
<br>
| id|  name|
<br>
+---+------+
<br>
|  1| Burke|
<br>
|  2| Kamal|
<br>
|  3|  Olga|
<br>
|  4| Belle|
<br>
|  5|Trevor|
<br>
+---+------+
<br>
</span>
-- 修改字段取别名
scala> students.select($"name".as("new_name")).show(5)

<span style="font-size: 18px;">
+--------+
<br>
|new_name|
<br>
+--------+
<br>
|   Burke|
<br>
|   Kamal|
<br>
|    Olga|
<br>
|   Belle|
<br>
|  Trevor|
<br>
+--------+
<br>
</span>
--查询id大于五
scala> students.filter("id>5").show(5)

<span style="font-size: 18px;">
+---+------+--------------+--------------------+
<br>
| id|  name|         phone|               Email|
<br>
+---+------+--------------+--------------------+
<br>
|  6|Laurel|1-691-379-9921|adipiscing@consec...|
<br>
|  7|  Sara|1-608-140-1995|Donec.nibh@enimEt...|
<br>
|  8|Kaseem|1-881-586-2689|cursus.et.magna@e...|
<br>
|  9|   Lev|1-916-367-5608|Vivamus.nisi@ipsu...|
<br>
| 10|  Maya|1-271-683-2698|accumsan.convalli...|
<br>
+---+------+--------------+--------------------+
<br>
</span>
-- 查询名称为空或者名称为NULL(filter=where)
scala> students.filter("name=''or name='NULL'").show(false)

<span style="font-size: 18px;">
+---+----+--------------+--------------------------+
<br>
|id |name|phone         |Email                     |
<br>
+---+----+--------------+--------------------------+
<br>
|21 |    |1-711-710-6552|lectus@aliquetlibero.co.uk|
<br>
|22 |    |1-711-710-6552|lectus@aliquetlibero.co.uk|
<br>
|23 |NULL|1-711-710-6552|lectus@aliquetlibero.co.uk|
<br>
+---+----+--------------+--------------------------+
<br>
</span>
-- 查询ID大于5且名称模糊查询 
scala> students.filter("id>5 and name like 'M%'").show(5)

<span style="font-size: 18px;">
+---+-------+--------------+--------------------+
<br>
| id|   name|         phone|               Email|
<br>
+---+-------+--------------+--------------------+
<br>
| 10|   Maya|1-271-683-2698|accumsan.convalli...|
<br>
| 19|Malachi|1-608-637-2772|Proin.mi.Aliquam@...|
<br>
+---+-------+--------------+--------------------+
<br>
</span>
-- 按照名称升序排序且不等于空
scala> students.sort($"name").select("id","name").filter("name <> ''").show(3)

<span style="font-size: 18px;">
+---+-----+
<br>
| id| name|
<br>
+---+-----+
<br>
| 16|Amena|
<br>
| 14|Anika|
<br>
|  4|Belle|
<br>
+---+-----+
<br>
</span>
-- 按照名称倒叙排序(sort = orderBy)
scala> students.sort($"name".desc).select("name").show(5)

<span style="font-size: 18px;">
+------+
<br>
|  name|
<br>
+------+
<br>
|Trevor|
<br>
| Tarik|
<br>
|  Sara|
<br>
|  Olga|
<br>
|  NULL|
<br>
+------+
<br>
</span>
-- 年龄分组 汇总
scala> students.groupBy("age").count().show

<span style="font-size: 18px;">
+----+-----+                                                                    
<br>
| age|count|
<br>
+----+-----+
<br>
|  19|    1|
<br>
|null|    1|
<br>
|  30|    1|
<br>
+----+-----+
<br>
</span>
-- 聚合函数使用
scala> students.agg("id" -> "max", "id" -> "sum").show(false)

<span style="font-size: 18px;">
+-------+-------+
<br>
|max(id)|sum(id)|
<br>
+-------+-------+
<br>
|9      |276.0  |
<br>
+-------+-------+
<br>
</span>
-- join操作,using模式seq指定多个字段 
<span style="font-size: 18px;">
students.join(students2, Seq("id", "name"), "inner")
<br>
</span>
-- DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型
-- 指定类型,指定join的类型

<span style="font-size: 18px;">
students.join(students2 , students("id" ) === students2( "t1_id"), "inner")
<br>
</span>

五、DataFrame API实现文件操作
1.maven依赖下载
<span style="font-size: 20px;">
<spark.version>2.3.1</spark.version>
<br>
<br>
<!-- 添加Spark Core的dependency -->
<br>
<dependency>
<br>
  <groupId>org.apache.spark</groupId>
<br>
  <artifactId>spark-core_2.11</artifactId>
<br>
  <version>${spark.version}</version>
<br>
</dependency>
<br>
<br>
<!-- 添加Spark SQL的dependency -->
<br>
<dependency>
<br>
  <groupId>org.apache.spark</groupId>
<br>
  <artifactId>spark-sql_2.11</artifactId>
<br>
  <version>${spark.version}</version>
<br>
</dependency>
<br>
</span>

2、IDEA实现方式:
<span style="font-size: 20px;">
package com.zrc.ruozedata.sparkSQL
<br>
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
<br>
import org.apache.spark.sql.{Row, SparkSession}
<br>
<br>
object SparkSQL001 extends App {
<br>
      /*
<br>
       * RDD与DataFrame反射方式实现(一)
<br>
       * 创建RDD --> DataFrema
<br>
       * 利用case class创建Schema,来解析输出文本每一行信息
<br>
       */
<br>
      val spark = SparkSession.builder()
<br>
      .master("local[2]")
<br>
      .appName("SparkSQL001")
<br>
      .getOrCreate() // 操作hive添加
<br>
      val  infos = spark.sparkContext.textFile("file:///F:/infos.txt")
<br>
<br>
      /*
<br>
      import spark.implicits._
<br>
      val infoDF = infos.map(_.split(",")).map(x=>Info(x(0).toInt,x(1),x(2).toInt)).toDF()
<br>
      infoDF.show()
<br>
      */
<br>
<br>
      /*
<br>
       * RDD与DataFrame使用StructType方式实现(二)
<br>
        * StructType构造了StructField方法传入name和dataType
<br>
        * 每一个字段就是为一个StructField
<br>
        * Schema和RDD通过createDataFrame方法作用起来
<br>
      */
<br>
       // 注意通过ROW获取的需要转换对应类型
<br>
      val infoss = infos.map(_.split(",")).map(x=>Row(x(0).trim.toInt,x(1),x(2).trim.toInt))
<br>
      val fields = StructType(
<br>
            Array(
<br>
                  StructField("id",IntegerType,true),
<br>
                  StructField("name",StringType,true),
<br>
                  StructField("age",IntegerType,true)
<br>
            )
<br>
      )
<br>
      val schema = StructType(fields)
<br>
      val infoDF = spark.createDataFrame(infoss,schema)
<br>
      infoDF.show()
<br>
      spark.stop()
<br>
}
<br>
// case class Info (id:Int,name:String,age:Int)
</span>

免费预约试听课