spark sql是spark框架的重要組成部分, 主要用於結構化資料處理和對spark資料執行類sql的查詢。
dataframe是乙個分布式的,按照命名列的形式組織的資料集合。 一張sql資料表可以對映為乙個dataframe物件,dataframe是spark sql中的主要資料結構。
sqlcontext例項是dataframe和spark sql的操作入口, pyspark互動環境中已初始化了乙個sqlcontext例項, 在提交任務指令碼時需要使用乙個sparkcontext來初始化:
from pyspark.sql import sqlcontext
sqlcontext = sqlcontext(sparkcontext)
本文測試環境為spark 2.1.0, python api.
sqlcontext.createdataframe
方法可以從python的list中建立dataframe:
>>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
>>> df = sqlcontext.createdataframe(data)
>>> df.collect()
[row(_1=u'a', _2=1, _3=18),
row(_1=u'b', _2=2, _3=22),
row(_1=u'c', _2=3, _3=20)]
list中的每一項成為dataframe中的一行, 每一列的名字預設為_1
,_2
,_3
.
同樣可以使用rdd來建立:
>>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
>>> rdd = sc.parallelize(data)
>>> df = sqlcontext.createdataframe(rdd)
>>> df.collect()
[row(_1=u'a', _2=1, _3=18),
row(_1=u'b', _2=2, _3=22),
row(_1=u'c', _2=3, _3=20)]
或者採用更簡單的方法:
>>> df = rdd.todf()
>>> >>> df.collect()
[row(_1=u'a', _2=1, _3=18),
row(_1=u'b', _2=2, _3=22),
row(_1=u'c', _2=3, _3=20)]
createframe的第二個引數為可選引數schema用於定義每一列的名稱和型別:
>>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
>>> df = sqlcontext.createdataframe(data, ['name', 'id', 'age'])
>>> df.collect()
[row(name=u'a', id=1, age=18),
row(name=u'b', id=2, age=22),
row(name=u'c', id=3, age=20)]
同樣可以使用元素為dict的列表建立dataframe例項:
>>> data = [
... ,
... ,
... ]
>>> df = sqlcontext.createdataframe(data)
>>> df.collect()
[row(name=u'a', id=1, age=18),
row(name=u'b', id=2, age=22),
row(name=u'c', id=3, age=20)]
不過spark官方推薦使用row物件來代替dict:
>>> from pyspark.sql import row
>>> user = row('name', 'id', 'age')
>>> row1 = user('a', 1, 18)
>>> row2 = user('b', 2, 22)
>>> row3 = user('b', 3, 20)
>>> data = [row1, row2, row3]
>>> df = sqlcontext.createdataframe(data)
>>> df.collect()
[row(name=u'a', id=1, age=18),
row(name=u'b', id=2, age=22),
row(name=u'c', id=3, age=20)]
schema引數也可以使用pyspark中定義的字段型別:
>>> from pyspark.sql.types import structtype, structfield
>>> from pyspark.sql.types import stringtype, integertype
>>> schema = structtype([
... structfield("name", stringtype(), true), # name, type, nullable
... structfield("id", integertype(), true),
... structfield("age", integertype(), true)])
>>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
>>> df = sqlcontext.createdataframe(data, schema)
>>> df.collect()
[row(name=u'a', id=1, age=18),
row(name=u'b', id=2, age=22),
row(name=u'c', id=3, age=20)]
更多關於createdataframe方法的資訊可以參考官方文件
sqlcontext.read
是乙個pyspark.sql.dataframereader
物件, 它可以用於根據外部資料來源建立dataframe, 包括讀取檔案和使用jdbc讀取資料庫。
詳情可以參考官方文件
dataframe提供了一些常用操作的實現, 可以使用這些介面檢視或修改dataframe:
dataframe的一些屬性可以用於檢視它的結構資訊:
dataframe支援使用map和reduce操作:
dataframe的結構可以進行一些修改:
>>> df.drop('age')
dataframe[age:int, id: int]
>>>df.drop(df.name)
dataframe[age:int, id: int]
同樣可以查詢dataframe中特定的記錄:
>>> df.filter(df.age>=20).collect()
[row(name=u'b', id=2, age=22), row(name=u'c', id=3, age=20)]
>>> df.select('*').collect()
[row(name=u'a', id=1, age=18), row(name=u'b', id=2, age=22), row(name=u'c', id=3, age=20)]
>>> df.select(df.id, df.age-1).collect()
[row(id=1, (age - 1)=17), row(id=2, (age - 1)=21), row(id=3, (age - 1)=19)]
>>> df.collect()
[row(name=u'a', id=1, age=18), row(name=u'b', id=2, age=22), row(name=u'c', id=3, age=20)]
>>> df2.collect()
[row(id=1, nation=u'cn'), row(id=2, nation=u'us'), row(id=4, nation=u'uk')]
>>> df.join(df2, 'id').collect()
[row(id=1, name=u'a', age=18, nation=u'cn'), row(id=2, name=u'b', age=22, nation=u'us')]
更多資訊可以參考官方文件 Spark SQL是處理結構化的資料
spark sql是處理結構化的資料,可以儲存在二維表中,類似資料庫中的表一樣儲存資料 spark1.x val sqlcontext new sparkcontext conf val sqlcontext new sqlcontext sc 將rdd和schema資訊關聯到一起,1,rdd和ca...
結構化資料 半結構化資料 非結構化資料
結構化資料 即行資料,儲存在資料庫裡,可以用二維表結構來邏輯表達實現的資料 所謂半結構化資料,就是介於完全結構化資料 如關係型資料庫 物件導向資料庫中的資料 和完全無結構的資料 如聲音 影象檔案等 之間的資料,html文件就屬於半結構化資料。它一般是自描述的,資料的結構和內容混在一起,沒有明顯的區分...
結構化資料 半結構化資料和非結構化資料
本文 在實際應用中,我們會遇到各式各樣的資料庫如nosql非關聯式資料庫 memcached,redis,mangodb rdbms關聯式資料庫 oracle,mysql等 還有一些其它的資料庫如hbase,在這些資料庫中,又會出現結構化資料,非結構化資料,半結構化資料,下面列出各種資料型別 結構化...