基於RDD建立DataFrame

2021-08-14 18:24:35 字數 2685 閱讀 5441

spark sql支援兩種不同的方法用於轉換已存在的rdd成為dataset

第一種方法是使用反射去推斷乙個包含指定的物件型別的rddschema。在你的spark應用程式中當你已知schema時這個基於方法的反射可以讓你的**更簡潔。

不過這種方法要求你在寫程式時已經知道rdd物件的結構資訊,因為需要提前根據該結構資訊來定義case class。

第二種用於建立dataset的方法是通過乙個允許你構造乙個schema然後把它應用到乙個已存在的rdd的程式設計介面。然而這種方法更繁瑣,當列和它們的型別知道執行時都是未知時它允許你去構造dataset

該方法適用於在執行之前還不知道列以及列的型別的情況。

// spark sql 的 scala 介面支援自動轉換乙個包含 case classes 的 rdd 為 dataframe。 case class 定義了表的 schema。case class 的引數名使用反射讀取並且成為了列名。

// case class 也可以是巢狀的或者包含像 seqs 或者 arrays 這樣的複雜型別。這個 rdd 能夠被隱式轉換成乙個 dataframe 然後被註冊為乙個表。表可以用於後續的 sql 語句。

import org.apache.spark.sql.catalyst.encoders.expressionencoder

import org.apache.spark.sql.encoder

// for implicit conversions from rdds to dataframes

import spark.implicits._

// create an rdd of person objects from a text file, convert it to a dataframe

val peopledf = spark.sparkcontext

.textfile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => person(attributes(0), attributes(1).trim.toint))

.todf()

case class不能夠在執行之前被定義(例如,records記錄的結構在乙個string字串中被編碼了,或者乙個text文字dataset將被解析並且不同的使用者投影的字段是不一樣的)。乙個dataframe可以使用下面的三步以程式設計的方式來建立。

從原始的rdd建立rddrows(行)。

step 1被建立後,建立schema表示乙個structtype匹配rdd中的rows(行)的結構。

通過sparksession提供的createdataframe方法應用schemarddrows(行)。

import org.apache.spark.sql.types._

import org.apache.spark.sql.row

// create an rdd

val peoplerdd = spark.sparkcontext.textfile("examples/src/main/resources/people.txt")

// the schema is encoded in a string

val schemastring = "name age"

// generate the schema based on the string of schema

val fields = schemastring.split(" ")

.map(fieldname => structfield(fieldname, stringtype, nullable = true))

val schema = structtype(fields)

// convert records of the rdd (people) to rows

val rowrdd = peoplerdd

.map(_.split(","))

.map(attributes => row(attributes(0), attributes(1).trim))

val peopledf = spark.createdataframe(rowrdd, schema)

RDD操作建立RDD,轉換操作

學習完廈門大學資料庫spark課程總結 rdd是面對物件的檔案集合,類似於dataframe的一行資料,建立rdd有很多種模式 lines sc.textfile file usr local spark 檔案目錄位址 注意sc是sparkcontext縮寫可能需要import一下,這是從本地檔案建...

建立RDD方式

i 通過外部的儲存系統建立rdd,如本地檔案,hdfs等 scala val a sc.textfile root.text.txt scala val a sc.textfile hdfs hadoop 01 9000 text.txt ii 將driver的scala集合通過並行化的方式變成rd...

基於Scala的RDD運算

def addone x int int map運算 對rdd中每乙個元素做乙個轉換操作,生成乙個新的rdd println 使用具體的函式完成map運算 intrdd.map addone collect mkstring println 使用匿名函式完成map運算 intrdd.map x x ...