在2.0版本之前,使用spark必須先建立sparkconf和sparkcontext,
不過在spark2.0中只要建立乙個sparksession就夠了,sparkconf、sparkcontext和sqlcontext都已經被封裝在sparksession當中。
在與spark2.0互動之前必須先建立
spark
物件
val spark = sparksession.builder()
.master(masterurl)
.config("spark.some.config.option", "some-value")
.getorcreate()
當建立好了sparksession,我們就可以配置spark執行相關屬性。比如下面**片段我們修改了已經存在的執行配置選項。
spark.conf.set("spark.sql.shuffle.partitions", 6)spark.conf.set("spark.executor.memory", "2g")
絕大多數的屬性控制應用程式的內部設定,並且預設值都是比較合理的。下面對這些屬性進行說明:
該屬性沒有預設值,它的含義是你的應用程式的名字,這個名字設定之後將會在web ui上和日誌資料裡面顯示。如果這個屬性沒有設定的話,將會把你應用程式的
main
函式所在類的全名作為應用程式的名稱。在
yarn
環境下,還可以用
--name
或者來設定應用程式的名稱。為了能夠方便地檢視各個應用程式的含義,取乙個好的名字是很重要的。
該屬性沒有預設值。這是spark程式需要連線的集群管理器所在的
url位址。當前的
spark
支援三種集群方式
standalone
、apache mesos
以及yarn
模式。如果這個屬性在提交應用程式的時候沒設定,程式將會通過
system.getenv("master")
來獲取master
環境變數;但是如果
master
環境變數沒有設定,那麼程式將會把
master
的值設定為
local[*]
,之後程式將在本地啟動。
該屬性的預設值是512m。每個
executor
處理器可以使用的記憶體大小之和,跟
jvm的記憶體表示的字串格式是一樣的
(比如:
'512m'
,'2g')
。在早期版本的
spark
,是通過
-xmx
和-xms
來設定的。如果這個值沒有設定,那麼程式將會先獲取
spark_executor_memory
環境變數;如果還沒設定,那麼獲取
spark_mem
環境變數的值;如果這個值也沒設定,那麼這個值將會別設定為
512,
。幾乎所有執行時效能相關的內容都或多或少間接和記憶體大小相關。這個引數最終會被設定到executor的
jvm的
heap
尺寸上,對應的就是
xmx和
xms的值。
預設值是org.apache.spark.serializer.j**aserializer。用於序列化網路傳輸或者以序列化形式快取起來的各種物件的類。預設的
serializer
可以對所有的
j**a
物件進行序列化,但是它的速度十分慢!所以如果速度是影響程式執行的關鍵,你可以將該值設定為
org.apache.spark.serializer.kryoserializer
。在一些情況下,
kryoserializer
的效能可以達到
j**aserializer的10
倍以上,但是相對於
j**aserializer
而言,主要的問題是它不能支援所有的
j**a
物件。當然,使用者可以直接繼承
org.apache.spark.serializer
來實現自己的
serializer
。預設值為空。如果你使用了kryoserializer,就要為
kryo
設定這個類去註冊你自定義的類,該類必須繼承自
kryoregistrator
,實現其中的
registerclasses(kryo: kryo)
即可。預設值為/tmp。用於設定
spark
的快取目錄,包括了
輸出的檔案,快取到磁碟的
rdd資料。最好將這個屬性設定為訪問速度快的本地磁碟。同
hadoop
一樣,可以用逗號分割來設定多個不同磁碟的目錄。需要注意,在
spark 1.0
和之後的版本,這個屬性將會被
spark_local_dirs (standalone, mesos)
或者 local_dirs (yarn)
環境變數替代。
預設值是false。當
sparkcontext
啟動的時候,以
info
日誌級別記錄下有效的
sparkconf
。當建立好sparksession後,就可以讀取資料了
用乙個map來儲存讀取檔案的格式
val options = map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs:")
再讀取hdfs上的資料
val data all=spark.sqlcontext.read.options(options).format("com.databricks.spark.csv").load()或者val data all=spark.sqlcontext.read.options(options).csv(filepath)
儲存資料在hdfs上
val s**eoption = map("header" -> "true", "delimiter" -> "\t", "path" -> path)data.repartition(1).write.format("com.databricks.spark.csv").mode(s**emode.overwrite).options(s**eoption).s**e()
讀取的資料建立臨時**
data.createorreplacetempview("groupdata")
可用sparksession.sql對資料進行字段提取,處理
val result = spark.sql("select ip,sum(count) count from groupdata group by ip")
spark 讀取 hdfs 資料分割槽規則
下文以讀取 parquet 檔案 parquet hive table 為例 hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertmetastoreparquet 控制,預設為 true。如果設定為 true 會使用 org.apache.s...
spark 讀取 hdfs 資料分割槽規則
下文以讀取 parquet 檔案 parquet hive table 為例 hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertmetastoreparquet 控制,預設為 true。如果設定為 true 會使用 org.apache.s...
hbase資料加鹽讀取(spark篇)
未加鹽資料 spark可以使用inputformat outputformat來讀寫hbase表。加鹽以後 需要在rowkey之前加一些字首,否則是查不到資料的。1 我們需要重新寫getsplits方法 從名字我們可以知道是要計算有多少個splits。在hbase中,乙個region對用乙個spli...