基本思路:1、對資料分塊,使用多個worker分別處理乙個資料塊,每個worker暴露兩個介面,分別是損失計算的介面loss和梯度計算的介面grad;
2、同時定義full_loss和full_grad介面對每個worker的loss和grad進行聚合;
3、使用bfgs演算法進行引數優化,
分別使用full_loss和full_grad作為bfgs的損失函式和梯度函式,即可進行網路引數優化;
注意:在此實現中,每個worker內部每次均計算乙個資料塊上的損失和梯度,而非乙個batch
。#0、匯入依賴
import
numpy as np
import
osimport
scipy.optimize
import
tensorflow as tf
from tensorflow.examples.tutorials.mnist import
input_data
import
rayimport
ray.experimental.tf_utils
#1、定義模型
class
linearmodel(object):
def__init__
(self, shape):
"""creates a linearmodel object.
"""x =tf.placeholder(tf.float32, [none, shape[0]])
w =tf.variable(tf.zeros(shape))
b = tf.variable(tf.zeros(shape[1]))
self.x =x
self.w =w
self.b =b
y = tf.nn.softmax(tf.matmul(x, w) +b)
y_ = tf.placeholder(tf.float32, [none, shape[1]])
self.y_ =y_
cross_entropy =tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
self.cross_entropy =cross_entropy
self.cross_entropy_grads =tf.gradients(cross_entropy, [w, b])
self.sess =tf.session()
self.variables =ray.experimental.tf_utils.tensorflowvariables(
cross_entropy, self.sess)
defloss(self, xs, ys):
"""計算loss
"""return
float(
self.sess.run(
self.cross_entropy, feed_dict=))
defgrad(self, xs, ys):
"""計算梯度
"""return
self.sess.run(
self.cross_entropy_grads, feed_dict=)
#2、定義遠端worker,用於計算模型loss、grads
@ray.remote
class
netactor(object):
def__init__
(self, xs, ys):
os.environ[
"cuda_visible_devices
"] = ""
with tf.device(
"/cpu:0"):
self.net = linearmodel([784, 10])
self.xs =xs
self.ys =ys
#計算乙個資料塊的loss
defloss(self, theta):
net =self.net
net.variables.set_flat(theta)
return
net.loss(self.xs, self.ys)
#計算乙個資料塊的梯度
defgrad(self, theta):
net =self.net
net.variables.set_flat(theta)
gradients =net.grad(self.xs, self.ys)
return np.concatenate([g.flatten() for g in
gradients])
defget_flat_size(self):
return
self.net.variables.get_flat_size()
#3、獲取遠端worker損失的函式
deffull_loss(theta):
theta_id =ray.put(theta)
loss_ids = [actor.loss.remote(theta_id) for actor in
actors]
return
sum(ray.get(loss_ids))
#4、獲取遠端worker梯度的函式
deffull_grad(theta):
theta_id =ray.put(theta)
grad_ids = [actor.grad.remote(theta_id) for actor in
actors]
#使用fmin_l_bfgs_b須轉換為float64資料型別
return sum(ray.get(grad_ids)).astype("
float64")
#5、使用lbfgs進行訓練
if__name__ == "
__main__":
ray.init()
mnist = input_data.read_data_sets("
mnist_data
", one_hot=true) #
資料分塊,每個worker跑乙個資料塊
num_batches = 10batch_size = mnist.train.num_examples //num_batches
batches = [mnist.train.next_batch(batch_size) for _ in
range(num_batches)]
actors = [netactor.remote(xs, ys) for (xs, ys) in
batches] #
引數初始化
dim =ray.get(actors[0].get_flat_size.remote())
theta_init = 1e-2 * np.random.normal(size=dim) #
優化 result =scipy.optimize.fmin_l_bfgs_b(
full_loss, theta_init, maxiter=10, fprime=full_grad, disp=true)
分布式機器學習第3章 分布式機器學習框架
q 需要使用到分布式機器學習有哪三種情形?q 對於計算量太大時的分布式機器學習解決辦法 q 對於訓練資料太多時的分布式機器學習解決辦法 q 對於模型規模太大時的分布式機器學習解決辦法 q 目前分布式機器學習領域的主要矛盾是?q 分布式機器學習的主要組成模組有哪四個?q 分布式機器學習的資料劃分中,對...
分布式機器學習dask
分布式機器學習 dask是乙個資料分析的平行計算的框架。pip安裝 pip install dask compete install everything pip install dask install only core cluster 部署 安裝dask 1.2.2 conda install...
分布式機器學習主要筆記
mahout是hadoop的乙個機器學習庫,主要的程式設計模型是mapreduce 每個企業的資料都是多樣的和特別針對他們需求的。然而,在對那些資料的分析種類上卻沒多少多樣性。mahout專案是實施普通分析計算的乙個hadoop庫。用例包括使用者協同過濾 使用者建議 聚類和分類。mllib 執行在s...