分布式tensorflow底層的通訊是grpc。grpc首先是乙個rpc,即遠端過程呼叫,通俗的解釋是:假設你在本機上執行一段**num=add(a,b),它呼叫了乙個過程call,然後返回了乙個值num,你感覺這段**只是在本機上執行的,但實際情況是,本機上的add方法是將引數打包傳送給伺服器,然後伺服器執行伺服器端的add方法,返回的結果再將資料打包返回給客戶端。
tensorflow集群就是一組任務,每個任務就是乙個服務。服務由兩個部分組成,第一部分是master,用於建立session,第二部分是worker,用於執行具體的計算。tensorflow一般將任務分為兩類job:一類叫引數伺服器,parameter server,簡稱為ps,用於儲存tf.variable;一類就是普通任務,稱為worker,用於執行具體的計算。
一般而言,機器學習的引數訓練過程可以劃分為兩個類別:第乙個是根據引數算算梯度,第二個是根據梯度更新引數。對於小規模訓練,資料量不大,引數數量不多,乙個cpu就足夠了,兩類任務都交給乙個cpu來做。對於普通的中等規模的訓練,資料量比較大,引數數量不多,計算梯度的任務負荷較重,引數更新的任務負荷較輕,所以將第一類任務交給若干個cpu或gpu去做,第二類任務交給乙個cpu即可。對於超大規模的訓練,資料量大、引數多,不僅計算梯度的任務要部署到多個cpu或gpu上,而且更新引數的任務也要部署到多個cpu。如果計算量足夠大,一台機器能搭載的cpu和gpu數量有限,就需要多台機器來進行計算能力的擴充套件了。引數伺服器是一套分布式儲存,用於儲存引數,並提供引數更新的操作。
每個任務用乙個ip:port表示。tensorflow用tf.train.clusterspec表示乙個集群資訊。
import tensorflow as tf
cluster = tf.train.clusterspec()
上面的語句提供了乙個tensorflow集群資訊,集群有兩類任務,稱為job,乙個job是ps,乙個job是worker;ps由2個任務組成,worker由3個任務組成。
def main(_):
server = tf.train.server(
cluster,
job_name=flags.job_name,
task_index=flags.task_index
)server.join()
if __name__ == "__main__":
乙個tf.train.server包含了:本地裝置(gpus,cpus)的集合,可以連線到其它task的ip:port(儲存在cluster中),還有乙個session target用來執行分布操作。還有最重要的一點就是,它建立了乙個伺服器監聽port埠,如果有資料傳過來,他就會在本地執行(啟動session target,呼叫本地裝置執行運算),然後結果返回給呼叫者。
with tf.device("/job:ps/task:0"):
weights_1 = tf.variable(...)
biases_1 = tf.variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.variable(...)
biases_2 = tf.variable(...)
with tf.device("/job:worker/task:0"): #對映到主機(10.1.1.1)上去執行
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
with tf.device("/job:worker/task:1"): #對映到主機(10.1.1.2)上去執行
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.session("grpc:") as sess: #在主機(10.1.1.2)上執行run
for _ in range(10000):
sess.run(train_op)
with tf.session("grpc://..")是指定gprc://..為master,master將op分發給對應的task。
同步更新指的是:各個用於平行計算的電腦,計算完各自的batch 後,求取梯度值,把梯度值統一送到ps服務機器中,由ps服務機器求取梯度平均值,更新ps伺服器上的引數。如下圖所示,可以看成有四台電腦,第一台電腦用於儲存引數、共享引數、共享計算,可以簡單的理解成記憶體、計算共享專用的區域,也就是ps job;另外三颱電腦用於平行計算的,也就是worker task。
這種計算方法存在的缺陷是:每一輪的梯度更新,都要等到a、b、c三颱電腦都計算完畢後,才能更新引數,也就是迭代更新速度取決與a、b、c三颱中,最慢的那一台電腦。
非同步更新指的是:ps伺服器收到只要收到一台機器的梯度值,就直接進行引數更新,無需等待其它機器。
這種迭代方法比較不穩定,收斂曲線震動比較厲害,因為當a機器計算完更新了ps中的引數,可能b機器還是在用上一次迭代的舊版引數值。
in-graph模式下資料分發在乙個節點上。這種方式配置簡單,其他結算節點只需join操作,暴露乙個網路介面,等在那裡接受任務就好。但壞處就是訓練資料的分發在乙個節點上,要把訓練資料分到不同的機器上,嚴重影響了併發的訓練速度。
between-graph模式下,訓練的引數儲存在引數伺服器,資料不用分發,資料分片的儲存在各個計算節點,各個計算節點自己算自己的,算完後把要更新的引數告訴引數伺服器,引數伺服器更新引數。這種模式的優點是不用進行訓練資料的分發,尤其資料量在tb級的時候,節省了大量的時間,所以大資料深度學習推薦使用between-graph模式。
上面介紹的是直接使用tensorflow進行手工配置,通過修改部分**即可實現分布式計算的過程。通過調研發現,yahoo於2023年開源了基於spark的tensorflow,使用executor執行worker和ps task。tensorflowonspark為apache hadoop和apache spark 集群帶來了可擴充套件的深度學習。通過結合深度學習框架tensorflow和大資料框架apache spark和apache hadoop的顯著特性,tensorflowonspark可在gpu和cpu伺服器集群上實現分布式深度學習。
tensorflowonspark在apache spark集群上啟用分布式tensorflow訓練和推理。它旨在最大限度地減少在共享網格上執行現有tensorflow程式所需的**更改量。
tensorflowonspark具有以下優勢:
tensorflowonspark開源時間較短,未得到充分驗證,並且安裝配置過程十分艱難(網路資料顯示,使用yahoo提供的配置過程無法完成集群搭建任務,會遇到各種坑)。
TensorFlow分布式實踐
大資料時代,基於單機的建模很難滿足企業不斷增長的資料量級的需求,開發者需要使用分布式的開發方式,在集群上進行建模。而單機和分布式的開發 有一定的區別,本文就將為開發者們介紹,基於tensorflow進行分布式開發的兩種方式,幫助開發者在實踐的過程中,更好地選擇模組的開發方向。分布式開發會涉及到更新梯...
TensorFlow分布式實踐
大資料時代,基於單機的建模很難滿足企業不斷增長的資料量級的需求,開發者需要使用分布式的開發方式,在集群上進行建模。而單機和分布式的開發 有一定的區別,本文就將為開發者們介紹,基於tensorflow進行分布式開發的兩種方式,幫助開發者在實踐的過程中,更好地選擇模組的開發方向。基於tensorflow...
分布式計算
定義 研究如何把乙個需要非常巨大的計算能力才能解決的問題分成許多小的部分,然後把這些部分分配給許多計算機進行處理,最後把這些計算結果綜合起來得到最終的結果。什麼是分布式系統 分布式系統 distributed system 是由多台計算機和通訊的軟體通過計算機網路連線組成 本地區域網或者廣域網 分布...