當我們編寫mapreduce程式的時候,都會進行輸入格式的設定,方便hadoop可以根據設定得檔案格式正確的讀取資料進行處理,一般設定**如下:
job.
setinputformatclass
(textinputformat.
class
)
通過上面的**來保證輸入的檔案是按照我們想要的格式被讀取,所有的輸入格式都繼承於inputformat,這是乙個抽象類,其子類有專門用於讀取普通檔案的fileinputformatt,用於讀取資料庫檔案的dbinputfromat,用於讀取hbase的tableinputformat等等。下面是inputformat的定義:
public
abstract
class
inputformat
textinputformat繼承於fileinputformat,檔案切分沒有進行更改,對recordreader進行了定製。
org.apache.hadoop.mapreduce.lib.input.fileinputformat
org.apache.hadoop.mapreduce.lib.input.textinputformat
檔案切分原理public
static
final string split_maxsize =
"mapreduce.input.fileinputformat.split.maxsize"
;public
static
final string split_minsize =
"mapreduce.input.fileinputformat.split.minsize"
;
fileinputformat提供了三個引數來共同控制分片的大小:
乙個檔案分片最小的有效位元組數:mapreduce.input.fileinputformat.split.minsize
乙個檔案分片最大有效位元組數: mapreduce.input.fileinputformat.split.maxsize
hdfs中塊的大小: dfs.blocksize
這三個引數按照公式splitsize = max(minimumsize, min(maximumsize, blocksize))
來進行分片大小的確定,可以通過改變上述三個引數來調節最終的分片大小。
檔案切分原始碼分析
先舉乙個切分的例項,假設我們的輸入檔案是128m,dfs的大小是40m,則這個檔案應該是有5個塊,假設我們定義的最大切分大小是30m,則根據公式max(minimumsize, min(maximumsize, blocksize)),我們的分片的大小是30m,接下來我們分析如何切分產出inputsplit,主要是分析函式getsplits
首先獲取minimumsize和maximumsize
/** 最小map分片長度 min<1, "mapreduce.input.fileinputformat.split.minsize">**/
long minsize = math.
max(
getformatminsplitsize()
,getminsplitsize
(job));
/** 最大map分片長度 mapreduce.input.fileinputformat.split.maxsize **/
long maxsize =
getmaxsplitsize
(job)
;
獲取輸入目錄下所有的檔案狀態資訊
list
files =
liststatus
(job)
;
遍歷檔案進行每個檔案的切分
for
(filestatus file: files)
檔案長度不為0時候,首先獲取檔案的位置資訊;檔案長度為0,直接創造乙個空的host的陣列返回
blocklocation[
] blklocations;
/** 獲取改檔案所在的位置資訊 **/
if(file instanceof
locatedfilestatus
)else
然後如果檔案是可切分的,進行blocksize獲取,求出切分大小,按照切分大小進行切分。
/** 乙個檔案塊大小:預設為128m **/
long blocksize = file.
getblocksize()
;/** 根據檔案塊大小,map最小分片大小和最大分片大小確定分片的大小:
公式:max(minimumsize, min(maximumsize, blocksize))` **/
long splitsize =
computesplitsize
(blocksize, minsize, maxsize)
;long bytesremaining = length;
// 還剩餘的檔案長度
/** slpit_slop是1.1避免最後剩一點點檔案大小也劃分乙個map **/
while((
(double
) bytesremaining)
/splitsize > split_slop)
/** 為滿足切分條件,但是還剩下部分資料 **/
if(bytesremaining !=0)
獲取檔案塊的index是通過函式getblockindex
進行的,我們根據開頭的例子進行具體分析
protected
intgetblockindex
(blocklocation[
] blklocations,
long offset)
} blocklocation last = blklocations[blklocations.length -1]
;long filelength = last.
getoffset()
+ last.
getlength()
-1;throw
newillegalargumentexception
("offset "
+ offset +
" is outside of file (0.."
+ filelength +
")")
;}
至此我們分析完了切分的過程,最終返回的是切分的檔案的元資訊,包含了檔案位置,要讀取得開始位置,讀取的長度,塊所在的host資訊等。
hadoop 原始碼分析一
inputformat inputsplit 繼承自writable介面,因此乙個inputsplit實則包含了四個介面函式,讀和寫 readfields和 write getlength能夠給出這個split中所記錄的資料大小,getlocations能夠得到這個split位於哪些主機之上 blk...
Hadoop原始碼之JobTracker
jobtracker是map reducer中任務排程的伺服器。1 有如下執行緒為其服務 1 提供兩組rpc服務 intertrackerprotocol jobsubmissionprotocol 的1個listener執行緒與預設10個handler執行緒 2 提供任務執 況查詢的一組web服務...
Hadoop 中 IPC 的原始碼分析
最近開始看 hadoop 的一些原始碼,展開hadoop的原始碼包,各個元件分得比較清楚,於是開始看一下 ipc 的一些原始碼。ipc模組,也就是程序間通訊模組,如果是在不同的機器上,那就可以理解為 rpc 了,也就是遠端呼叫。事實上,hadoop 中的 ipc 也就是基於 rpc 實現的。使用 s...