方法一 由反射機制推斷出模式:
step 1:引用必要的類。
import org.apache.spark.sql._import sqlcontext.implicits._
//idea中此處匯入應在sqlcontext 建立之後,否則報錯,不知道為什麼。。??
//在使用spark shell時,下面這句不是必需的。
//spark shell已預設為你啟用了sql context,在需要時可直接使用sqlcontext。
val sqlcontext = new org.apache.spark.sql.sqlcontext(sc)
step 2:建立rdd。
//匯入csv檔案並處理逗號分隔的資料
val sfpdrdd = sc.textfile("
/home/shiyanlou/sfpd.csv
").map(inc => inc.split("
,"))
step 3:定義 case class 。
caseclass incidents(incidentnum:string, category:string, description:string, dayofweek:string, date:string, time:string, pddistrict:string, resolution:string, address:string, x:string, y:string, location:string, pdid:string)
step 4:將 rdd 轉換為含有 case 物件的 rdd 。
val sfpdcase = sfpdrdd.map(inc => incidents(inc(0), inc(1), inc(2), inc(3), inc(4), inc(5), inc(6), inc(7), inc(8), inc(9), inc(10), inc(11), inc(12)))
step 5:隱式轉換會將含有 case 物件的 rdd 轉換為 dataframe ,將 dataframe 的一些操作和函式應用於這個 dataframe 中。
val sfpddf = sfpdcase.todf()
方法二 通過程式設計方式構建模式:這種方式適用於列和型別在執行時不可知的情況,我們就需要手動地去構建 dataframe 的模式。通常 dataframe 的模式在動態變化時才會使用這種方式。
注意:該方式在 case class 在不能被提前定義時,或者使用 scala 語言的專案中 case class 超過22個字段時,才會用到。step 1:引入必要的類。
import sqlcontext.implicits._import org.apache.spark.sql._
import org.apache.spark.sql.types._
step 2:由原始 rdd 建立乙個 row rdd 。
val rowrdd = sc.textfile("/home/shiyanlou/data.txt
").map(x => x.split("
")).map( p => row(p(0), p(2), p(4)))
step 3:使用 structtype 和 structfield 分別建立模式。其中, structtype 對應於 table (表),structfield 對應於 field (字段)。
val testschema = structtype(array(structfield("incnum
", stringtype, true), structfield("
date
", stringtype, true), structfield("
district
", stringtype, true)))
step 4:使用 sqlcontext 提供的方法,將模式應用於 row rdd 上,以建立 dataframe。
val testdf =sqlcontext.createdataframe(rowrdd, testschema)//將dataframe註冊為表
testdf.registertemptable("
test")
val incs = sql("
select * from test
")
現有的大資料應用通常需要蒐集和分析來自不同的資料來源的資料。而 dataframe 支援 json 檔案、 parquet 檔案、 hive 表等資料格式。它能從本地檔案系統、分布式檔案系統(hdfs)、雲儲存(amazon s3)和外部的關係資料庫系統(通過jdbc,在spark 1.4版本起開始支援)等地方讀取資料。另外,通過 spark sql 的外部資料來源 api ,dataframe 能夠被擴充套件,以支援第三方的資料格式或資料來源。
csv:
主要是com.databricks_spark-csv_2.11-1.1.0
這個庫,用於支援 csv 格式檔案的讀取和操作。
step 1:
將該壓縮檔案解壓至/home/shiyanlou/.ivy2/jars/
目錄中,確保該目錄含有如圖所示的以下三個 jar 包。
step 2 匯入包:
spark-shell --packages com.databricks:spark-csv_2.11:1.1.0
step 3 直接將 csv 檔案讀入為 dataframe :
val df = sqlcontext.read.format("com.databricks.spark.csv
").option("
header
", "
true
").load("
/home/shiyanlou/1987.csv")
//此處的檔案路徑請根據實際情況修改
step 4 根據需要修改字段型別:
def convertcolumn(df: org.apache.spark.sql.dataframe, name:string, newtype:string) =//例如
val df_3 = convertcolumn(df_2, "arrdelay", "int")
val df_4 = convertcolumn(df_2, "depdelay", "int")
json:
sqlcontext.read.json(filepath)
擴充套件閱讀:
由於資料格式和資料來源眾多,這裡暫不一一展開講解。在實際應用中,如果需要使用某種格式的資料或者某個資料來源,應查詢其官方文件。通常官方文件(特別是 api 手冊)都提供了詳細的整合方法和指導。
在 spark 中,預設的資料來源被設定為 parquet ,所以通用的載入方式為:
sqlcontext.load("/home/shiyanlou/data.parquet
")
如果是其他格式,則需要手動地指定格式:
sqlcontext.load("/home/shiyanlou/data
", "
json
")
下面給出了其他的載入指定資料來源的方法:
需要注意的是,在 spark 1.4 及之後的版本中,載入資料來源的方法為:
//預設格式parquet檔案的載入方法,需要給出檔案的路徑
sqlcontext.read.load("
/home/shiyanlou/data.parquet")
//載入其他格式的檔案,需要在format方法中指明格式
sqlcontext.read.format("
json
").load("
/home/shiyanlou/data.json
")
執行緒建立的兩個方法
參看1.6版jdk thread類的說明 1 定義個自己的執行緒類 繼承至thread類 需要實現run方法。class primethread extends thread public void run 呼叫方式 primethread p new primethread 143 p.start...
建立兩個節點的私有鏈
1.啟動第乙個節點a,使用上一節建立私有鏈的方法,在啟動控制台時加入 nodiscover 2.新建另外乙個目錄ethereum2,啟動另外乙個節點b,使用同樣的genesis.json作為創世塊,使用不同的埠,啟動控制台 3.在節點a的控制台輸入admin 在節點b的控制台加入節點a的enode,...
建立交換分割槽的兩個命令
交換分割槽在物理記憶體 ram 被填滿時用來保持記憶體中的內容。當 ram 被耗盡,linux 會將記憶體中不活動的頁移動到交換空間中,從而空出記憶體給系統使用。雖然如此,但交換空間不應被認為是物理記憶體的替代品。若系統中沒有配置交換分割槽,當記憶體耗盡後,系統可能會殺掉正在執行中的程序 應用,從而...