tensorflow的session物件是可以支援多執行緒的,因此多個執行緒可以很方便地使用同乙個會話(session)並且並行地執行操作。然而,在python程式實現這樣的並行運算卻並不容易。所有執行緒都必須能被同步終止,異常必須能被正確捕獲並報告,回話終止的時候, 佇列必須能被正確地關閉。
所幸tensorflow提供了兩個類來幫助多執行緒的實現:tf.coordinator和tf.queuerunner。從設計上這兩個類必須被一起使用。coordinator類可以用來同時停止多個工作執行緒並且向那個在等待所有工作執行緒終止的程式報告異常。queuerunner類用來協調多個工作執行緒同時將多個張量推入同乙個佇列中。
fifoqueue 建立乙個先入先出佇列。修改佇列的主要狀態有:enqueuemany、enqueue 和dequeue。其中enqueuemany用於初始化佇列,enqueue用於入隊,dequeue用於出隊。直接上**:
執行結果為:0.3,1.1,1.2
#建立乙個先入先出大小為3的佇列,
q = tf.fifoqueue(
3, "float")
#使用enqueue_many函式初始化佇列,插入 0.1、 0.2、 0.3 三個數字
init = q.enqueue_many(([
0.1,
0.2,
0.3],))
# 使用dequeue函式定義出列,之後+1,再使用enqueue進行入列
x = q.dequeue()
y = x +
1q_inc = q.enqueue([y])
with tf.session()
as sess:
sess.run(init)
quelen = sess.run(q.size())
for i
in range(
2):sess.run(q_inc)
# 執行2次入隊操作,佇列中的值變為 0.3,1.1,1.2,入隊操作不能print,print出來為none
quelen = sess.run(q.size())
for i
in range(quelen):
print (sess.run(q.dequeue()))
# 輸出佇列的值
randomshufflequeue 建立乙個隨機佇列,在出佇列時,是以隨機的順序產生元素的。例如,我們在訓練一些影象樣本時,使用 cnn 的網路結構,希望可以無序地讀入訓練樣本,就要用randomshufflequeue,每次隨機產生乙個訓練樣本。
randomshufflequeue在tensorflow 使用非同步計算時非常重要。因為 tensorflow 的會話是支援多執行緒的,我們可以在主線程裡執行訓練操作,使用 randomshufflequeue 作為訓練輸入,開多個執行緒來準備訓練樣本,將樣本壓入佇列後,主線程會從佇列中每次取出 mini-batch 的樣本進行訓練。
執行結果是亂序的。
q = tf.randomshufflequeue(capacity=
10, min_after_dequeue=
2, dtypes=
"float")
sess = tf.session()
for i
in range(
0, 10):
#10 次入隊
sess.run(q.enqueue(i))
for i
in range(
0, 8):
# 8 次出隊
print(sess.run(q.dequeue()))
coordinator類可以用來同時停止多個工作執行緒並且向那個在等待所有工作執行緒終止的程式報告異常,其主要方法有:
在啟動執行緒之前首先建立乙個coordinator類,然後將這個類傳入每乙個建立的執行緒中。這些執行緒通常一直迴圈執行,一直到should_stop()返回true時停止。 任何執行緒都可以決定計算什麼時候應該停止。它只需要呼叫request_stop(),同時其他執行緒的should_stop()將會返回true,然後都停下來。
**如下:
上面講的coordinator是用於協調多個執行緒的,在此之前我們首先要啟動多個執行緒。佇列管理器(queuerunner)就是用來啟動(建立)多個執行緒來操作同乙個佇列。
import tensorflow
as tf
import numpy
as np
import threading
import time
#執行緒中執行的程式,這個程式每間隔1s判斷是否需要停止並列印自己的id
defmyloop
(coord, worker_id):
#使用tf.coordinator類提供的協同工具判斷當前執行緒是否需要停止
while
not coord.should_stop():
#隨機停止所有的執行緒
if np.random.rand() <
0.1:
print (
'stoping from id: %d\n' % worker_id)
coord.request_stop()
else:
#列印當前執行緒的id
print (
'working on id: %d\n' % worker_id)
time.sleep(
1)
#宣告乙個tf.train.coordinator類來協同多個執行緒
coord = tf.train.coordinator()
#宣告建立5個執行緒
threads = [threading.thread(target = myloop, args = (coord,i,))
for i
in range(
5)]#啟動所有的執行緒
for t
in threads:
t.start()
coord.join(threads)
下面是將coordinator和queuerunner結合起來的**:
簡單來講,就是首先通過tf.fifoqueue或者tf.randomshufflequeue建立乙個佇列,然後通過queuerunner來執行幾個執行緒, 這幾個執行緒處理樣本,並且將樣本推入佇列。在通過coordinator,讓queue runner使用coordinator來啟動這些執行緒,建立乙個訓練的迴圈,並且使用coordinator來控制queuerunner的執行緒們的終止。
#宣告乙個先進先出的佇列,佇列中最多有100個元素,型別為實數
queue = tf.fifoqueue(
100,
"float")
#定義佇列的入隊操作
enqueue_op = queue.enqueue([tf.random_normal([
1])])
#使用tf.train.queuerunner來建立多個執行緒執行佇列的入隊操作
#tf.train.queuerunner的第乙個引數給了被操作的佇列
#[enqueue_op]*5表示需要啟動5個執行緒,每乙個執行緒中執行的是enqueue_op操作
qr = tf.train.queuerunner(queue,[enqueue_op] *
5)#出隊操作
out_tensor = queue.dequeue()
with tf.session()
as sess:
coord = tf.train.coordinator()
#執行緒協調器,用於協同啟動的執行緒
threads = qr.create_threads(sess, coord = coord, start=
true)
for _
in range(
3):print (sess.run(out_tensor)[
0])coord.request_stop()
coord.join(threads)
tensorflow的session物件是可以支援多執行緒的,因此多個執行緒可以很方便地使用同乙個會話(session)並且並行地執行操作。然而,在python程式實現這樣的並行運算卻並不容易。所有執行緒都必須能被同步終止,異常必須能被正確捕獲並報告,回話終止的時候, 佇列必須能被正確地關閉。
多執行緒 理解多執行緒(一)
程序 程序是cpu分配資源的基本單位 執行緒 執行緒是cpu排程的基本單位 資源分配給程序,所有執行緒共享該程序的資源 當執行緒數大於cpu的數量,會出現時間片的輪詢。cpu時間片是直接分配給執行緒的,執行緒拿到cpu時間片就能執行了 cpu時間片不是先分給程序然後再由程序分給程序下的執行緒的。所有...
多執行緒理解
本文介紹我對於多執行緒的理解。一 概念 1.作業系統下的多程序場景 但其實上面的情景是屬於作業系統下的多程序,不是單個程式內的多程序,這種多程序跟我們在單個程式內的多執行緒沒有什麼可以比較的。2.執行緒 舉個最簡單例子 我們的任務需要兩個計算型操作 a b 每個操作耗時10秒,如果是單執行緒,這時我...
多執行緒理解
單核cpu 單執行緒與多執行緒 執行時都是併發操作 a.執行條件 無io等操作,時間大小 多執行緒 單執行緒,原因 多執行緒執行時執行緒切換耗時間 b.執行條件 有io等操作,時間大小 單執行緒 多執行緒,原因 單執行緒要等待io的操作時間,從而加長時間 單核多執行緒的作用 在b中展現出優點。多核c...