通常在乙個流式計算的主流程裡,會用到很多對映資料,比較常見的是text文件,但是文件讀進來之後還要匹配相應的schema,本文通過自定義textsource資料來源,自動讀取預設的schema。
defaultsource.scala
package com.wxx.bigdata.sql_custome_source
import org.apache.spark.sql.sqlcontext
import org.apache.spark.sql.sources.
import org.apache.spark.sql.types.structtype
class defaultsource extends relationprovider with schemarelationprovider
} override def createrelation(sqlcontext: sqlcontext, parameters: map[string, string]) :baserelation =
}
textdatasourcerelation.scala
package com.wxx.bigdata.sql_custome_source
import com.wxx.bigdata.utils.utils
import org.apache.spark.internal.logging
import org.apache.spark.rdd.rdd
import org.apache.spark.sql.
import org.apache.spark.sql.sources.
import org.apache.spark.sql.types.
class textdatasourcerelation(override val sqlcontext: sqlcontext,
path : string,
userschema : structtype) extends baserelation with tablescan with loggingelse
} override def buildscan() :rdd[row] = else if(value == "1")else
} else , schemafield(index).datatype)}})
result.map(x => row.fromseq(x))
})rows.flatmap(x => x)
}}
utils.scala
package com.wxx.bigdata.utils
import org.apache.spark.sql.types.
object utils
}}
package com.wxx.bigdata.sql_custome_source
import org.apache.spark.sql.sparksession
def main(args: array[string]): unit =
}
測試文件
欄位名依次為id,name,性別(0:男,1女),薪水,獎金
10000,zhangsan,0,100000,200000
10001,lisi,0,99999,199999
10002,wangwu,0,2000,5
10003,zhaoliu,0,2001,6
10004,tianqi,0,2007,7
Spark SQL中自定義函式詳解
資料來源 初始化sparksession package com.kfk.spark.common import org.apache.spark.sql.sparksession author 蔡政潔 email caizhengjie888 icloud.com date 2020 12 2 t...
Spark Sql之UDAF自定義聚合函式
udaf user defined aggregate function。使用者自定義聚合函式 我們可能下意識的認為udaf是需要和group by一起使用的,實際上udaf可以跟group by一起使用,也可以不跟group by一起使用,這個其實比較好理解,聯想到mysql中的max min等函...
scanf fscanf 自定義讀取規則
個人覺得非常有用但是較少人知道,所以 出來,也方便自己以後回顧。標準輸入輸出函式 和 n說明符的使用方法 scanf fscanf,均從第乙個非空格的可顯示字元開始讀起!標準輸入輸出函式scanf具有相對較多的轉換說明符,它常常作為入門級函式出現在各種教材中。但奇怪的是,和n這兩種都為c89 c99...