下圖列出了涉及mapreduce讀取資料的幾個核心類以及常見的幾種擴充套件。
如圖所示,inputformat類抽象了兩個方法,建立分片的getsplit( )和建立資料讀取工具的createrecordreader( ),可以擴充套件inputformat重寫這兩個方法來實現不同資料來源讀取資料,或者採用不同的方式讀取資料。
inputsplit表示資料的邏輯分片,最常見的是用於表示文字檔案分片的filesplit類,該類擴充套件了inputspit,包含了檔案的路徑、分片起始位置在原始檔中的位元組偏移量、分片的位元組長度以及分片所屬檔案塊存在資料節點資訊。用於分片資訊在客戶端提交作業時會被序列化到檔案然後提交,並且在作業執行中會被反序列化,所以filesplit還實現了writable介面,實現了write(dataoutput out)和readfields(datainput in)兩個方法。
正真為mapreduce提供資料輸入的是recordreader類,給它分配乙個分片,它就從資料來源中讀取分片指定的資料段、並將資料組織成指定的資料結構。
hadoop預設提供的一些資料讀取類基本可以滿足多數需求,特殊情況下我們也可以自己擴充套件。以下用乙個簡單了例子介紹擴充套件方式。
擴充套件的目的是從多個不同型別的資料庫(mysql、oracle、db2等)、或者多張表讀取資料。**如下:
public class multitableinputsplit extends dbinputsplit implements writablemultiinputsplit 類間接擴充套件了inputsplit類,新增了資料庫連線資訊和查詢資料所使用的sql語句。public multitableinputsplit(long start, long end, string intputsql,
string dbconurl, string username, string password, string dbtype)
@override
public void write(dataoutput output) throws ioexception
@override
public void readfields(datainput input) throws ioexception
//get、set等方法省略
}
public class multitableinputformat extends inputformatreturn splits;
} @override
public recordreadercreaterecordreader(
inputsplit split, taskattemptcontext context) throws ioexception,
interruptedexception catch (sqlexception ex)
} /**
* 可以根據表的數量、表的大小控制分片的數量
*/private int getsplitcount(string sqlinfo)
/*** 計算分片的大小
*/private int getsplitsize(string sqlinfo)
public void getsplit(string inputquery, jobcontext job, listsplits) else
splits.add(split);
} }}
class multitablerecordreader extends recordreader@override
public boolean nextkeyvalue() throws ioexception
if (value == null)
if (null == this.results)
if (!results.next())
key.set(pos + split.getstart());
value.readfields(results);
pos++;
} catch (sqlexception e)
return true; }
@override
public longwritable getcurrentkey()
@override
public mapdbwritable getcurrentvalue()
@override
public float getprogress() throws ioexception
@override
public void initialize(inputsplit split, taskattemptcontext context)
throws ioexception, interruptedexception
/*** 根據不同的資料庫型別實現不同的分頁查詢
*/protected string getselectquery() if(dbtype.equalsignorecase("oracle")) else
}} catch (ioexception ex)
return query.tostring();
} public void initconnection() throws sqlexception
@override
public void close() throws ioexception
if (null != statement)
if (null != connection)
} catch (sqlexception e)
} protected resultset executequery(string query) throws sqlexception
public connection getconnection()
public dbinputformat.dbinputsplit getsplit()
protected void setstatement(preparedstatement stmt)
}
public class mapdbwritable implements dbwritable通過擴充套件以上幾個類就可以從多個資料庫、多個表讀取資料了。public void readfields(resultset resultset) throws sqlexception
} @override
public string tostring()
return builder.substring(0, builder.length() - 1);
} public void write(preparedstatement preparedstatement) throws sqlexception
public mapgetvalues()
public void setvalues(mapvalues)
public string getcolnames()
public void setcolnames(string colnames)
public mapgetcoltype()
public void setcoltype(mapcoltype)
}
mapreduce資料傾斜
前言 資料傾斜是日常大資料查詢中 的乙個bug,遇不到它時你覺得資料傾斜也就是書本部落格上的乙個無病呻吟的偶然案例,但當你遇到它是你就會懊悔當初怎麼不多了解一下這個赫赫有名的事故。當然你和資料傾斜的緣分深淺還是看你公司的業務邏輯和資料量有沒有步入資料傾斜的領地。說明 關於資料傾斜的產生原因我將結合 ...
HBASE 資料操作,MapReduce
前面已經對hbase有了不少了解了,這篇重點在實踐操作。hbase本身是乙個很好的key value的儲存系統,但是也不是萬能的,很多時候還是要看用在什麼情形,怎麼使用。kv之類的資料庫就是要應用在這類快速查詢的應用上,而不是像傳統的sql那樣關聯查詢,分組計算,這些可就不是hbase的長處了。下面...
大資料之Map reduce
大資料問題一般解決方式 利用雜湊函式進行分流來解決記憶體限制或者其他限制的問題。1.雜湊函式又叫雜湊函式,雜湊函式的輸入域可以是非常大的範圍,但是輸出域是固定範圍。假設為s。雜湊函式的性質 1.典型的雜湊函式都擁有無限的輸入值域。2.輸入值相同時 返回值一樣。3.輸入值不同時,返回值可能一樣,也可能...