更新時間:2021-09-08 來源:黑馬程序員 瀏覽量:
當(dāng)case類不能提前定義的時候,就需要采用編程方式定義Schema信息,定義DataFrame主要包含3個步驟,具體如下:
(1)創(chuàng)建一個Row對象結(jié)構(gòu)的RDD;
(2)基于StructType類型創(chuàng)建Schema;
(3)通過SparkSession提供的createDataFrame()方法來拼接Schema。
根據(jù)上述步驟,創(chuàng)建SparkSqlSchema. scala文件,使用編程方式定義Schema信息的具體代碼如文件4-3所示。
文件4-3 SparkSqlSchema.scala
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sq1.types. {IntegerType,StringType,StructField,StructType} import org.apache.spark.sql.(DataFrame,Row,Sparkession) object SparkSqlSchema { def main(args: Array[string]): Unit=( //1.創(chuàng)建SparkSession val spark: sparkSession=Sparksession.bullder() .appName ("SparkSq1Schema") .master ("1oca1[2]") .getOrCreate () //2.獲取sparkConttext對象 val sc: SparkContext=spark.sparkContext //設(shè)置日志打印級別 sc.setLogLevel ( "WARN") //3.加載數(shù)據(jù) val dataRDD:RDD[String]=sc.textFile("D://spark//person.txt") //4.切分每一行 val dataArrayRDD:RDD[ Array[string]]=dataRDD.map( .split(" ")) //5.加載數(shù)據(jù)到Row對象中 val personRDD:RDD[Row]= dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //6.創(chuàng)建Schema val schema:StructType=StructType(Seq( StructField("id",IntegerType,false), StructField("name",StringType,false), StructField("age", IntegerType, false) )) //7.利用personRDD與Schema創(chuàng)建DataFrame val personDF:DataFrame=spark.createDataFrame(personRDD,schema) //8.DSL操作顯示DataFrame的數(shù)據(jù)結(jié)果 personDF . show () //9.將DataFrame注冊成表 personDF.createOrReplaceTempView ("t_person") //10.sq1語句操作 spark.sq1 ("select¥from t_ person") .show() //11.關(guān)閉資源 sc.stop() spark.stop ()
在文件4-3中,第9~23行代碼表示將文件轉(zhuǎn)換成為RDD的基本步驟,第25~29行代碼即為編程方式定義Schema的核心代碼,Spark SQL提供了Class StructType( val fields:Array[StructField])類來表示模式信息,生成一個StructType對象,需要提供fields作為輸入?yún)?shù),fields是個集合類型,StructField(name,dataTypenullable)參數(shù)分別表示為字段名稱、字段數(shù)據(jù)類型、字段值是否允許為空值,根據(jù)person.txt文本數(shù)據(jù)文件分別設(shè)置id、name、age字段作為Schema,第31行代碼表示通過調(diào)用spark.createDataFrame()方法將RDD和Schema進行合并轉(zhuǎn)換為DataFrame,第33~40行代碼即為操作DataFrame進行數(shù)據(jù)查詢。