更新时间:2021年03月23日17时09分 来源:传智教育 浏览次数:
在Windows系统下开发Scala代码,可以使用本地环境测试,因此我们首先需要在本地磁盘准备文本数据文件,这里将HDFS中的/spark/person.txt文件下载到本地D:/spark/person.txt路径下。从文件4-1可以看出,当前数据文件共3列,我们可以非常容易的分析出,这三列分别是编号、姓名、年龄。但是计算机无法像人一样直观的感受字段的实际含义,因此我们需要通过反射机制来推断包含特定类型对象的Schema信息。
接下来我们打开IDEA开发工具,创建名为“spark_chapter04”的Maven工程,讲解实现反射机制推断Schema的开发流程。
1.添加Spark SQL依赖
在pom.xml文件中添加Spark SQL依赖,代码片段如下所示。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
2.编写代码
实现反射机制推断Schema需要定义一个case class样例类,定义字段和属性,样例类的参数名称会被利用反射机制作为列名,编写代码如文件1所示。
文件1 CaseClassSchema.scala
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} //定义样例类 case class Person(id:Int,name:String,age:Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //1.构建SparkSession val spark : SparkSession = SparkSession.builder() .appName("CaseClassSchema ") .master("local[2]") .getOrCreate() //2.获取SparkContext val sc : SparkContext =spark.sparkContext //设置日志打印级别 sc.setLogLevel("WARN") //3.读取文件 val data: RDD[Array[String]] = sc.textFile("D://spark//person.txt").map(x=>x.split(" ")) //4.将RDD与样例类关联 val personRdd: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5.获取DataFrame //手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRdd.toDF //------------DSL风格操作开始------------- //1.显示DataFrame的数据,默认显示20行 personDF.show() //2.显示DataFrame的schema信息 personDF.printSchema() //3.统计DataFrame中年龄大于30的人数 println(personDF.filter($"age">30).count()) //-----------DSL风格操作结束------------- //-----------SQL风格操作开始------------- //将DataFrame注册成表 personDF.createOrReplaceTempView("t_person") spark.sql("select * from t_person").show() spark.sql("select * from t_person where name='zhangsan'").show() //-----------SQL风格操作结束------------- //关闭资源操作 sc.stop() spark.stop() }