更新時(shí)間:2020-12-21 來(lái)源:黑馬程序員 瀏覽量:
RDD處理過(guò)程中的“轉(zhuǎn)換”操作主要用于根據(jù)已有RDD創(chuàng)建新的RDD,每一次通過(guò)Transformation算子計(jì)算后都會(huì)返回一個(gè)新RDD,供給下一個(gè)轉(zhuǎn)換算子使用。下面,通過(guò)一張表來(lái)列舉一些常用轉(zhuǎn)換算子操作的API,如表1所示。
表1 常用的轉(zhuǎn)換算子API
下面,我們通過(guò)結(jié)合具體的示例對(duì)這些轉(zhuǎn)換算子API進(jìn)行詳細(xì)講解。
·filter(func)
filter(func)操作會(huì)篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集。假設(shè),有一個(gè)文件test.txt(內(nèi)容如文件3-1),下面,通過(guò)一張圖來(lái)描述如何通過(guò)filter算子操作,篩選出包含單詞“spark”的元素,具體過(guò)程如圖1所示。
圖1 filter算子操作
在圖1中,通過(guò)從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過(guò)filter操作篩選出滿足條件的元素,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:
scala> val lines = sc.textFile("file:///export/data/test.txt") lines: org.apache.spark.rdd.RDD[String] = `[file:///export/data/test.txt]`(file:///\\export\data\test.txt) MapPartitionsRDD[1] at textFile at <console>:24 scala> val linesWithSpark = lines.filter(line => line.contains("spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:25
在上述代碼中,filter()輸入的參數(shù)line => line.contains(“spark”)是一個(gè)匿名函數(shù),其含義是依次取出lines這個(gè)RDD中的每一個(gè)元素,對(duì)于當(dāng)前取到的元素,把它賦值給匿名函數(shù)中的line變量。若line中包含“spark”單詞,就把這個(gè)元素加入到RDD(即linesWithSpark)中,否則就丟棄該元素。
·map(func)
map(func)操作將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集。假設(shè),有一個(gè)文件test.txt(內(nèi)容如文件1),接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)map算子操作把文件內(nèi)容拆分成一個(gè)個(gè)的單詞并封裝在數(shù)組對(duì)象中,具體過(guò)程如圖2所示。
圖2 map算子操作
在圖2中,通過(guò)從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過(guò)map操作將文件的每一行內(nèi)容都拆分成一個(gè)個(gè)的單詞元素,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:
scala> val lines = sc.textFile("file:///export/data/test.txt") lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt) MapPartitionsRDD[4] at textFile at <console>:24 scala> val words = lines.map(line => line.split(" ")) words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[13] at map at <console>:25
上述代碼中,lines.map(line => line.split(“ ”))含義是依次取出lines這個(gè)RDD中的每個(gè)元素,對(duì)于當(dāng)前取到的元素,把它賦值給匿名函數(shù)中的line變量。由于line是一行文本,如“hadoop spark”,一行文本中包含多個(gè)單詞,且空格進(jìn)行分隔,通過(guò)line.split(“ ”)匿名函數(shù),將文本分成一個(gè)個(gè)的單詞,拆分后得到的單詞都被封裝到一個(gè)數(shù)組對(duì)象中,成為新的RDD(即words)的一個(gè)元素。
·flatMap(func)
flatMap(func)與map(func)相似,但是每個(gè)輸入的元素都可以映射到0或者多個(gè)輸出的結(jié)果。有一個(gè)文件test.txt(內(nèi)容如文件3-1),接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)flatMap算子操作,把文件內(nèi)容拆分成一個(gè)個(gè)的單詞,具體過(guò)程如圖3所示。
圖3 flatMap算子操作
在圖3中,通過(guò)從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過(guò)flatMap操作將文件的每一行內(nèi)容都拆分成一個(gè)個(gè)的單詞元素,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:
scala> val lines = sc.textFile("file:///export/data/test.txt") lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt) MapPartitionsRDD[5] at textFile at <console>:24 scala> val words = lines.flatMap(line => line.split(" ")) words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:25
在上述代碼中,lines. flatMap(line => line.split(“ ”))等價(jià)于先執(zhí)行l(wèi)ines.map(line => line.split(“ ”))操作(請(qǐng)參考map(func)操作),再執(zhí)行flat()操作(即扁平化操作),把wordArray中的每個(gè)RDD都扁平成多個(gè)元素,被扁平后得到的元素構(gòu)成一個(gè)新的RDD(即words)。
groupByKey()
groupByKey()主要用于(Key,Value)鍵值對(duì)的數(shù)據(jù)集,將具有相同Key的Value進(jìn)行分組,會(huì)返回一個(gè)新的(Key,Iterable)形式的數(shù)據(jù)集。同樣以文件test.txt為例,接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)groupByKey算子操作,將文件內(nèi)容中的所有單詞進(jìn)行分組,具體過(guò)程如圖4示。
圖4 groupByKey算子操作
在圖4中,通過(guò)groupByKey操作把(Key,Value)鍵值對(duì)類型的RDD,按單詞將單詞出現(xiàn)的次數(shù)進(jìn)行分組,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:
scala> val lines = sc.textFile("file:///export/data/test.txt") lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt) MapPartitionsRDD[6] at textFile at <console>:24 scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1)) words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:25 scala> val groupWords=words.groupByKey() groupWords: org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[16] at groupByKey at <console>:25
上述代碼中,words.groupByKey()操作執(zhí)行后,RDD中所有的Key相同的Value都被合并到一起。例如,(“spark”,1)、(“spark”,1)、(“spark”,1)這三個(gè)鍵值對(duì)的Key都是“spark”,合并后得到新的鍵值對(duì)(“spark”,(1,1,1))。
reduceByKey(func)
reduceByKey()主要用于(Key,Value)鍵值對(duì)的數(shù)據(jù)集,返回的是一個(gè)新的(Key,Iterable)形式的數(shù)據(jù)集,該數(shù)據(jù)集是每個(gè)Key傳遞給函數(shù)func進(jìn)行聚合運(yùn)算后得到的結(jié)果。同樣以文件test.txt(內(nèi)容如文件3-1),接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)reduceByKey算子操作統(tǒng)計(jì)單詞出現(xiàn)的次數(shù),具體操作如圖5所示。
圖5 reduceByKey()算子操作
在圖5中,通過(guò)reduceByKey操作把(Key,Value)鍵值對(duì)類型的RDD,按單詞Key將單詞出現(xiàn)的次數(shù)Value進(jìn)行聚合,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:
scala> val lines = sc.textFile("file:///export/data/test.txt") lines: org.apache.spark.rdd.RDD[String] = [file:///export/data/test.txt](file:///\\export\data\test.txt) MapPartitionsRDD[7] at textFile at <console>:24 scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1)) words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at map at <console>:25 scala> val reduceWords=words.reduceByKey((a,b)=>a+b) reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at reduceByKey at <console>:25
上述代碼中,執(zhí)行words.reduceByKey((a,b) => a +
b)操作,共分為兩個(gè)步驟,分別是先執(zhí)行reduceByKey()操作,將所有Key相同的Value值合并到一起,生成一個(gè)新的鍵值對(duì)(例如(“spark”,(1,1,1)));然后執(zhí)行函數(shù)func的操作,即使用(a,b)=>
a + b函數(shù)把(1,1,1)進(jìn)行聚合求和,得到最終的結(jié)果,即(“spark”,3)。
猜你喜歡
Scala在使用聲明值和變量時(shí)的注意事項(xiàng)
Spark Shell讀取HDFS文件的步驟有哪些?
2020-12-21Scala的控制結(jié)構(gòu)語(yǔ)句有哪幾種?各語(yǔ)法格式介紹
2020-12-17如何使用IDEA工具開(kāi)發(fā)一個(gè)WordCount單詞計(jì)數(shù)程序?
2020-12-17Scala在使用聲明值和變量時(shí)的注意事項(xiàng)有哪些?
2020-12-17詳解MapReduce編程的Map模型和Reduce模型
2020-12-17HBase數(shù)據(jù)庫(kù)物理存儲(chǔ)的存儲(chǔ)方式介紹【大數(shù)據(jù)文章】
2020-12-17