整個處理流程非常簡單。首先,解析輸入流,得到讀資料塊訊息協議opreadblockproto,即proto,並建立tracescope型別的tracescope;然後從讀資料塊訊息協議proto中解析出讀資料塊的各種引數,比如需要讀取的資料塊block、訪問令牌blocktoken、客戶端名稱clientname、資料塊讀取的起始偏移量blockoffset、資料塊讀取的長度length、是否傳送塊校驗sendchecksum、快取策略cachingstrategy型別的cachingstrategy等,利用這些引數呼叫子類dataxceiver執行緒的readblock()方法,進行讀資料塊的處理,最終關閉tracescope,整個資料塊讀取過程完畢。/** process op by the corresponding method. */
protected final void processop(op op) throws ioexception
}
那麼今天,我們首先來看下第一種資料讀寫請求--讀資料塊read_block,它是通過呼叫opreadblock()方法完成的,我們先看下這個方法的**:
/** receive op_read_block */
private void opreadblock() throws ioexception finally
}
我們再來看下其中涉及的部分細節。首先,在我們要概括性的講解讀資料塊訊息協議opreadblockproto前,我們先看下對於輸入流是怎麼處理的,答案就在類pbhelper中的vintprefixed()方法中,其**如下:
public static inputstream vintprefixed(final inputstream input)
throws ioexception
// codedinputstream用來讀取和解碼協議訊息字段。
// varint是一種數值壓縮儲存方法
// readrawvarint32()方法從輸入流中讀取乙個原始的varint,並且,如果高於32位,丟棄之。
// firstbyte是為了告訴codedinputstream已經從輸入流input中讀取了1個位元組
// 返回結果為int型別的訊息大小
int size = codedinputstream.readrawvarint32(firstbyte, input);
// 確保訊息大小必須大於0
assert size >= 0;
// 將輸入流input包裝成exactsizeinputstream,從該輸入流中只能讀取size大小的資料
// exactsizeinputstream是一種從其他輸入流中讀取固定大小資料的輸入流。
return new exactsizeinputstream(input, size);
}
首先呢,從輸入流input中讀入第乙個位元組byte,然後呼叫codedinputstream的readrawvarint32()方法,獲取請求內容的大小。codedinputstream用來讀取和解碼協議訊息字段。varint是一種數值壓縮儲存方法。readrawvarint32()方法從輸入流中讀取乙個原始的varint,並且,如果高於32位,丟棄之。firstbyte是為了告訴codedinputstream已經從輸入流input中讀取了1個位元組,返回結果為int型別的訊息大小,同時確保訊息大小必須大於0。最後,將輸入流input包裝成exactsizeinputstream,從該輸入流中只能讀取size大小的資料,exactsizeinputstream是一種從其他輸入流中讀取固定大小資料的輸入流。
接下來,我們再說下解析輸入流,得到讀資料塊訊息協議opreadblockproto。這個opreadblockproto是什麼呢?它是谷歌開源的protobuf在hdfs中定義的進行資料傳輸時的一種訊息協議,其訊息格式的定義在檔案datatransfer.proto中,內容如下:
message opreadblockproto
其中,header、offset、len為必須的,因為它們使用了關鍵字required,而剩餘兩個sendchecksums、cachingstrategy則由於使用了關鍵字optional,所以為可選的。並且,header為clientoperationheaderproto型別,而clientoperationheaderproto也是一種訊息格式,定義如下:
message clientoperationheaderproto
其中,baseheader還是protobuf定義的一種訊息格式,其名稱為baseheaderproto,其定義如下:
message baseheaderproto
它包含了資料塊block,即extendedblockproto,所以,在獲得讀資料塊訊息協議opreadblockproto之後,呼叫readblock()方法之前,我們可以使用如下語句:
pbhelper.convert(proto.getheader().getbaseheader().getblock()
來獲得readblock()方法需要使用的引數extendedblock。讀資料塊訊息協議中的其他字段不再多一一介紹,讀者可自行分析。
最後,我們來看下讀取資料塊的readblock()方法,其**如下:
@override
public void readblock(final extendedblock block,
final tokenblocktoken,
final string clientname,
final long blockoffset,
final long length,
final boolean sendchecksum,
final cachingstrategy cachingstrategy) throws ioexception catch(ioexception e)
// send op status
// 傳送操縱狀態
writesuccesswithchecksuminfo(blocksender, new dataoutputstream(getoutputstream()));
// 呼叫資料塊傳送器blocksender的sendblock()方法,傳送資料塊
long read = blocksender.sendblock(out, basestream, null); // send data
if (blocksender.didsendentirebyterange())
} catch (ioexception ioe)
} else
// 資料節點datanode記錄相關系統效能指標的增長,這裡是讀取的位元組數、讀取的塊數
datanode.metrics.incrbytesread((int) read);
datanode.metrics.incrblocksread();
} catch ( socketexception ignored )
// its ok for remote side to close the connection anytime.
datanode.metrics.incrblocksread();
ioutils.closestream(out);
} catch ( ioexception ioe ) finally
//update metrics
datanode.metrics.addreadblockop(elapsed());
datanode.metrics.incrreadsfromclient(peer.islocal());
}
readblock()方法大體處理流程如下:
1、將請求中的客戶端名稱clientname賦值給previousopclientname;
2、獲取輸出流basestream,即socketout;
3、將輸出流basestream依次包裝成bufferedoutputstream、dataoutputstream,其緩衝區大小取引數io.file.buffer.size的一半,引數未配置的話預設為512,且最大也不能超過512;
4、呼叫checkaccess()方法進行訪問許可權檢查;
5、傳送資料塊:
5.1、 獲取資料節點註冊資訊datanoderegistration;
5.2、更新當前執行緒名稱:sending block...;
5.3、構造資料塊傳送器blocksender物件blocksender,構造時,需要對應資料塊block、資料在塊中的起始位置blockoffset、讀取資料的長度length等資訊;
5.4、呼叫writesuccesswithchecksuminfo()方法傳送操作狀態;
5.5、呼叫資料塊傳送器blocksender的sendblock()方法,傳送資料塊;
5.6、資料節點datanode記錄相關系統效能指標的增長,這裡是讀取的位元組數、讀取的塊數;
5.7、關閉資料塊傳送器。
大體處理流程就是這個樣子。而關於blocksender及其構造、如何定位資料以及如何傳送資料等,我們將會在專門的文章中進行分析,敬請期待!
HDFS原始碼分析 RPC Client實現
通俗來講rpc remote procedure call 就是呼叫遠端的過程或者方法,既然涉及到遠端,必然會有 c s架構,即 client 和server 下面首先來看一下 client 端的實現。為實現遠端方法呼叫,最重要的就是跟遠端伺服器進行連線,然後不斷的傳輸客戶端想要呼叫的方法,包括方法...
HDFS的DataNode原始碼分析
1.大致流程 datanode.main 入口函式 securemain args,null createdatanode args,null,resources 建立datanode instantiatedatanode args,conf,resources getstoragelocatio...
client讀寫hdfs的原始碼分析總結
週末花了一天的時間仔細了重溫了一下client對hdfs檔案的讀寫過程,總結如下 每次讀寫都是以乙個資料塊的形式來進行的,並且包括資料內容和資料的校驗值。另外,到 namenode 上獲取相應的資訊都是用 rpc來通訊的,而到 datanode 獲取真正的資料塊內容是由 socket 的網路流來進行...