教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

多种方法创建DataFrame【大数据技术文章】

更新时间:2021年03月23日14时14分 来源:传智教育 浏览次数:

在Spark2.0版本之前,Spark SQL中的SQLContext是创建DataFrame和执行SQL的入口,我们可以利用HiveContext接口,通过HiveQL语句操作Hive表数据,实现数据查询功能。而在Spark2.0之后,Spark使用全新的SparkSession接口替代SQLContext及HiveContext接口完成数据的加载、转换、处理等功能。

创建SparkSession对象可以通过“SparkSession.builder().getOrCreate()”方法获取,但当我们使用Spark-Shell编写程序时,Spark-Shell客户端会默认提供了一个名为“sc”的SparkContext对象和一个名为“spark”的SparkSession对象,因此我们可以直接使用这两个对象,不需要自行创建。启动Spark-Shell命令如下所示。

$ spark-shell --master local[2]

在启动Spark-Shell完成后,效果如图1所示。

DataFrame的创建方法【大数据文章】

图1 启动Spark-Shell

在图1中可以看出,SparkContext、SparkSession对象已创建完成。创建DataFrame有多种方式,最基本的方式是从一个已经存在的RDD调用toDF()方法进行转换得到DataFrame,或者通过Spark读取数据源直接创建。

在创建DataFrame之前,为了支持RDD转换成DataFrame及后续的SQL操作,需要导入spark.implicits._包启用隐式转换。若使用SparkSession方式创建DataFrame,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,具体操作API如表1所示。

表1 spark.read操作

代码示例 描述
spark.read.text("people.txt") 读取txt格式的文本文件,创建DataFrame                                 
spark.read.csv ("people.csv") 读取csv格式的文本文件,创建DataFrame
spark.read.json("people.json") 读取json格式的文本文件,创建DataFrame
spark.read.parquet("people.parquet") 读取parquet格式的文本文件,创建DataFrame

1.数据准备

在HDFS文件系统中的/spark目录中有一个person.txt文件,内容如文件1所示。

文件1 person.txt

 zhangsan 20
 lisi 29
 wangwu 25
 zhaoliu 30
 tianqi 35
 jerry 40

2.通过文件直接创建DataFrame

我们通过Spark读取数据源的方式进行创建DataFrame,在Spark-Shell输入下列代码:

scala > val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.DataFrame = [value: String]
scala > personDF.printSchema()
root
 |-- value: String (Nullable = true)

从上述返回结果personDF的属性可以看出,创建DataFrame对象完成,之后调用DataFrame的printSchema()方法可以打印当前对象的Schema元数据信息。从返回结果可以看出,当前value字段是String数据类型,并且还可以为Null。

使用DataFrame的show()方法可以查看当前DataFrame的结果数据,具体代码和返回结果如下所示。

scala > personDF.show()
+-------------+                          
|   value   |
+-------------+
|1 zhangsan 20|
|2 lisi    29|
|3 wangwu  25|
|4 zhaoliu 30|
|5 tianqi  35|
|6 jerry  40|
+-------------+

从上述返回结果看出,当前personDF对象中的6条记录就对应了person.txt文本文件中的数据。

3.RDD转换DataFrame

调用RDD的toDF()方法,可以将RDD转换为DataFrame对象,具体代码如下所示。

   scala > val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
   lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at
   map at <console>:24
   scala > case class Person(id:Int,name:String,age:Int)
   defined class Person
   scala > val personRDD = 
lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
   personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map
   at <console>:27
   scala > val personDF = personRDD.toDF()
  personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more
  field]
  scala > personDF.show
  +----+--------+----+
  | id |  name | age|
  +----+--------+----+
  | 1 |zhangsan | 20|
  | 2 |lisi   |  29|
  | 3 |wangwu  |  25|
  | 4 |zhaoliu |  30|
  | 5 |tianqi  |  35|
  | 6 |jerry  |  40|
  +----+--------+----+
  scala > personDF.printSchema
  root
   |-- id: integer (nullable = false)
   |-- name: string (nullable = true)
   |-- age: integer (nullable = false)

在上述代码中,第1行代码将文本文件转换成RDD,第4行代码定义Person样例类,相当于定义表的Schema元数据信息,第6行代码表示使RDD中的数组数据与样例类进行关联,最终会将RDD[Array[String]]更改为RDD[Person],第9行代码表示调用RDD的toDF()方法,就可以把RDD转换成了DataFrame了。第12-27行代码表示调用DataFrame方法并从返回结果可以看出,RDD对象成功转换DataFrame。




猜你喜欢:

Redis、传统数据库、HBase以及Hive的区别

怎样安装和配置Sqoop?

DataFrame是什么意思?与RDD相比有哪些优点?

大数据Hadoop生态圈包含哪些子系统?

传智教育大数据项目开发培训

0 分享到:
和我们在线交谈!