建立DataFrame的兩個途徑

2022-03-06 03:34:15 字數 4174 閱讀 3717

方法一 由反射機制推斷出模式:
step 1:引用必要的類。
import org.apache.spark.sql._

import sqlcontext.implicits._

//idea中此處匯入應在sqlcontext 建立之後,否則報錯,不知道為什麼。。??

//在使用spark shell時,下面這句不是必需的。

//spark shell已預設為你啟用了sql context,在需要時可直接使用sqlcontext。

val sqlcontext = new org.apache.spark.sql.sqlcontext(sc)

step 2:建立rdd。
//

匯入csv檔案並處理逗號分隔的資料

val sfpdrdd = sc.textfile("

/home/shiyanlou/sfpd.csv

").map(inc => inc.split("

,"))

step 3:定義 case class 。

case

class incidents(incidentnum:string, category:string, description:string, dayofweek:string, date:string, time:string, pddistrict:string, resolution:string, address:string, x:string, y:string, location:string, pdid:string)

step 4:將 rdd 轉換為含有 case 物件的 rdd 。

val sfpdcase = sfpdrdd.map(inc => incidents(inc(0), inc(1), inc(2), inc(3), inc(4), inc(5), inc(6), inc(7), inc(8), inc(9), inc(10), inc(11), inc(12)))

step 5:隱式轉換會將含有 case 物件的 rdd 轉換為 dataframe ,將 dataframe 的一些操作和函式應用於這個 dataframe 中。

val sfpddf = sfpdcase.todf()

方法二 通過程式設計方式構建模式:
這種方式適用於列和型別在執行時不可知的情況,我們就需要手動地去構建 dataframe 的模式。通常 dataframe 的模式在動態變化時才會使用這種方式。

注意:該方式在 case class 在不能被提前定義時,或者使用 scala 語言的專案中 case class 超過22個字段時,才會用到。

step 1:引入必要的類。

import sqlcontext.implicits._

import org.apache.spark.sql._

import org.apache.spark.sql.types._

step 2:由原始 rdd 建立乙個 row rdd 。

val rowrdd = sc.textfile("

/home/shiyanlou/data.txt

").map(x => x.split("

")).map( p => row(p(0), p(2), p(4)))

step 3:使用 structtype 和 structfield 分別建立模式。其中, structtype 對應於 table (表),structfield 對應於 field (字段)。

val testschema = structtype(array(structfield("

incnum

", stringtype, true), structfield("

date

", stringtype, true), structfield("

district

", stringtype, true)))

step 4:使用 sqlcontext 提供的方法,將模式應用於 row rdd 上,以建立 dataframe。

val testdf =sqlcontext.createdataframe(rowrdd, testschema)

//將dataframe註冊為表

testdf.registertemptable("

test")

val incs = sql("

select * from test

")

現有的大資料應用通常需要蒐集和分析來自不同的資料來源的資料。而 dataframe 支援 json 檔案、 parquet 檔案、 hive 表等資料格式。它能從本地檔案系統、分布式檔案系統(hdfs)、雲儲存(amazon s3)和外部的關係資料庫系統(通過jdbc,在spark 1.4版本起開始支援)等地方讀取資料。另外,通過 spark sql 的外部資料來源 api ,dataframe 能夠被擴充套件,以支援第三方的資料格式或資料來源。

csv:

主要是com.databricks_spark-csv_2.11-1.1.0這個庫,用於支援 csv 格式檔案的讀取和操作。

step 1:

將該壓縮檔案解壓至/home/shiyanlou/.ivy2/jars/目錄中,確保該目錄含有如圖所示的以下三個 jar 包。

step 2 匯入包:

spark-shell --packages com.databricks:spark-csv_2.11:1.1.0

step 3 直接將 csv 檔案讀入為 dataframe :

val df = sqlcontext.read.format("

com.databricks.spark.csv

").option("

header

", "

true

").load("

/home/shiyanlou/1987.csv")

//此處的檔案路徑請根據實際情況修改

step 4 根據需要修改字段型別:

def convertcolumn(df: org.apache.spark.sql.dataframe, name:string, newtype:string) =

//例如

val df_3 = convertcolumn(df_2, "arrdelay", "int")

val df_4 = convertcolumn(df_2, "depdelay", "int")

json:

sqlcontext.read.json(filepath)

擴充套件閱讀:

由於資料格式和資料來源眾多,這裡暫不一一展開講解。在實際應用中,如果需要使用某種格式的資料或者某個資料來源,應查詢其官方文件。通常官方文件(特別是 api 手冊)都提供了詳細的整合方法和指導。

在 spark 中,預設的資料來源被設定為 parquet ,所以通用的載入方式為:

sqlcontext.load("

/home/shiyanlou/data.parquet

")

如果是其他格式,則需要手動地指定格式:

sqlcontext.load("

/home/shiyanlou/data

", "

json

")

下面給出了其他的載入指定資料來源的方法:

需要注意的是,在 spark 1.4 及之後的版本中,載入資料來源的方法為:

//

預設格式parquet檔案的載入方法,需要給出檔案的路徑

sqlcontext.read.load("

/home/shiyanlou/data.parquet")

//載入其他格式的檔案,需要在format方法中指明格式

sqlcontext.read.format("

json

").load("

/home/shiyanlou/data.json

")

執行緒建立的兩個方法

參看1.6版jdk thread類的說明 1 定義個自己的執行緒類 繼承至thread類 需要實現run方法。class primethread extends thread public void run 呼叫方式 primethread p new primethread 143 p.start...

建立兩個節點的私有鏈

1.啟動第乙個節點a,使用上一節建立私有鏈的方法,在啟動控制台時加入 nodiscover 2.新建另外乙個目錄ethereum2,啟動另外乙個節點b,使用同樣的genesis.json作為創世塊,使用不同的埠,啟動控制台 3.在節點a的控制台輸入admin 在節點b的控制台加入節點a的enode,...

建立交換分割槽的兩個命令

交換分割槽在物理記憶體 ram 被填滿時用來保持記憶體中的內容。當 ram 被耗盡,linux 會將記憶體中不活動的頁移動到交換空間中,從而空出記憶體給系統使用。雖然如此,但交換空間不應被認為是物理記憶體的替代品。若系統中沒有配置交換分割槽,當記憶體耗盡後,系統可能會殺掉正在執行中的程序 應用,從而...