今天去面試了一波,因為排程系統採用了sparksql實現資料從mysql到hive,在這一點上面試官很明顯很不滿我對於spark的理解,19年的第乙個面試就這麼掛了。
有問題不怕,怕的是知道了問題還得過且過。現在就來梳理下我的專案是怎麼使用spark導數的
第一步:把mysql中的表放入記憶體
properties.put("user", dbuser);
properties.put("password", dbpassword);
properties.put("driver", dbdriver);
datasetbizdateds = sparksession.read().jdbc(
dburl,
dbtablename,
properties
);
其中:org.apache.spark.sql.dataset(這裡面試官問我怎麼把mysql的資料轉化到spark,我沒答上來)
第二步:建立資料庫與表
2.1 建立庫
string createdbsql = "create database if not exists " + hivedbname + " location '" + dbpath + "'";
sparksession.sql(createdbsql);
```
2.2建立表
分成兩步,第一步讀取mysql元資料字段,第二步把這些字段建立出來
2.2.1 讀取mysql欄位
structtype structtype = bizdateds.schema();
structfield structfields = structtype.fields();
/*structfield是structtype中的字段。
param:name此字段的名稱。
param:datatype此字段的資料型別。
param:nullable指示此字段的值是否為空值。
param:metadata此字段的元資料。 如果未修改列的內容(例如,在選擇中),則應在轉換期間保留元資料。
*/
2.2.2 建立字段
string sourcetype; //name of the type used in json serialization.
string columnname;
string targettype;
structfield structfield;
sparkdatatypeenum sparkdatatype;
stringbuilder createbuilder = new stringbuilder(capacity);
listdbtablecolumns = lists.newarraylist();
mapdbtablecolumntypemap = maps.newhashmap();
//把mysql中的每個欄位都提取出來
for (int i = 0, len = structfields.length; i < len; i++)
sparkdatatype = sparkdatatypeenum.getitembytype(sourcetype);
if (null != sparkdatatype) else
dbtablecolumns.add(columnname);
dbtablecolumntypemap.put(columnname, targettype);
if (i != 0)
}sparksession.sql(createtablesql);
2.3 對比字段
我們在2.2中,如果hive有字段了,那麼就不會建立表。
問題在於,如果hive中的字段比mysql中的少怎麼辦?
2.3.1 獲取hive中的表字段
hiveutil connectiontohive = new hiveutil("org.apache.hive.jdbc.hivedriver", hiveurl, hiveuser, hivepassword);
public listgettablecolumns(string dbname,string tablename) throws sqlexception
databasemetadata metadata = connection.getmetadata();
rs = metadata.getcolumns(null, dbname, tablename.touppercase(), "%");
listcolumns = new arraylist();
while (rs.next())
return columns;
} catch (sqlexception e) finally
}}
2.3.2 對比字段並且新增:
for (string dbtablecolumn : dbtablecolumns)
if (!hivetablecolumns.contains(dbtablecolumn))
}
2.4 將記憶體中的表存入hive
bizdateds.createorreplacetempview(tmptablename); //注意這裡不是直接從mysql抽到hive,而是先從mysql抽到記憶體中
insert hive_table select hive中的已經有的表的字段 from tmptablename
##很明顯的,如果不是需要和hive已經有的表互動根本用不到jdbc
sparksql實現單詞計數
1 建立sparksession val sparksession sparksession.builder master local 2 getorcreate 2 載入資料,使用dataset處理資料集 read來讀取可以直接返回dataset string 這是個比rdd更高階的資料集 它返回...
SparkSQL之關聯mysql和hive查詢
create database spark use spark create table dept deptno int 2 primary key,dname varchar 14 loc varchar 13 insert into dept value 10,accounting new yo...
SparkSQL的3種Join實現
大家知道,在資料庫的常見模型中 比如星型模型或者雪花模型 表一般分為兩種 事實表和維度表。維度表一般指固定的 變動較少的表,例如聯絡人 物品種類等,一般資料有限。而事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。因為join操作是對兩個表中key值相同的記錄進行連線,在sparksq...