在tensorflow中,佇列和變數類似,都是計算圖上有狀態的節點。其它的計算節點可以修改它們的狀態。對於變數,可以通過賦值操作修改變數的取值。對於佇列,修改佇列狀態的操作主要有enqueue、enqueuemany和dequeue:
import tensorflow as tf
# 建立乙個先進先出佇列,指定佇列中最多可以儲存兩個元素,並指定型別為整數
q = tf.fifoqueue(2, "int32")
# 使用enqueue_many函式來初始化佇列中的元素。和初始化變數類似,在使用佇列之前需要明確的呼叫這個初始化過程
init = q.enqueue_many([0, 10],)
# 使用dequeue函式將佇列中的第乙個元素出佇列,這個元素的值將被存在變數x中
x = q.dequeue()
y = x+1
q_inc = q.enqueue([y])
with tf.session() as sess:
init.run()
for _ in range(5):
v, _ = sess.run([x, q_inc])
print v
# 0,10,1,11,2
tensorflow中提供了fifoqueue(先進先出)和randomshufflequeue(將佇列中的元素打亂,每次出佇列操作得到的是從當前佇列所有元素中隨機選擇的乙個)兩種佇列。
在tensorflow中,佇列不僅僅是一種資料結構,還是非同步計算張量取值的乙個重要機制。比如多個執行緒可以同時向乙個佇列中寫元素,或者同時讀取乙個佇列中的元素。
tensorflow提供了tf.coordinator和tf.queuerunner兩個類來完成多執行緒協同的功能。tf.coordinator主要用於協同多個執行緒一起停止,並提供了should_stop、request_stop和join三個函式。在啟動執行緒之前,需要先宣告乙個tf.coordinator類,並將這個類傳入每乙個建立的執行緒中。啟動的執行緒需要一直查詢tf.coordinator類中提供的should_stop函式,當這個函式的返回值為true時,則當前執行緒也需要退出。每乙個啟動的執行緒都可以通過呼叫request_stop函式來通知其他執行緒退出。當某乙個執行緒呼叫request_stop函式之後,should_stop函式的返回值將被設定為true,這樣其他的執行緒就可以同時終止了,以下程式展示了如何使用tf.coordinator
import tensorflow as tf
import numpy as np
import threading
import time
# 執行緒中執行的程式,這個程式每隔1秒判斷是否需要停止並列印自己的id
def myloop(coord, worker_id):
# 使用tf.coordinator類提供的協同工具判斷當前執行緒是否需要停止
while not coord.should_stop():
# 隨機停止所有的執行緒
if np.random.rand() < 0.1:
coord.request_stop()
else:
print worker_id
time.sleep(1) #暫停1秒
# 宣告乙個tf.train.coordinator類來協同多個執行緒
coord = tf.train.coordinator()
# 宣告建立5個執行緒
threads = [threading.thread(target=myloop, args=(coord, i, )) for i in xrange(5)]
# 啟動所有的執行緒
for t in threads:
t.start()
# 等待所有執行緒退出
coord.join(threads)
當所有執行緒啟動之後,每個執行緒會列印各自的id,然後暫停1秒之後,所有執行緒又開始第二遍列印id。。。
tf.queuerunner主要用於啟動多個執行緒來操作同乙個佇列,啟動的這些執行緒可以通過tf.coordinator類來統一管理。以下**展示了如何使用tf.queuerunner和tf.coordinator來管理多執行緒佇列操作
import tensorflow as tf
queue = tf.fifoqueue(100, "float")
enqueue_op = queue.enqueue([tf.random_noamal([1])])
# 使用tf.train.queuerunner來建立多個執行緒執行佇列的入隊操作
# tf.train.queuerunner的第乙個引數給出了被操作的佇列,[enqueue_op]*5表示需要啟動5個執行緒,每個執行緒中執行的是enqueue_op操作
qr = tf.train.queuerunner(queue, [enqueue_op]*5)
# 將定義過的queuerunner加入tensorflow計算圖上指定的集合,tf.train.add_queue_runner函式沒有指定集合,則加入預設集合tf.graphkeys.queue_runners
tf.train.add_queue_runner(qr)
# 定義出佇列操作
out_tensor = queue.dequeue()
with tf.session() as sess:
# 使用tf.train.coordinator來協同啟動的執行緒
coord = tf.train.coordinator
# 使用tf.train.queuerunner時,需要明確呼叫tf.train.start_queue_runners來啟動所有執行緒。否則因為沒有執行緒執行入隊操作,當呼叫出隊操作時,程式會一直預設等待入隊操作被執行。tf.train.start_queue_runners函式會預設啟動tf.graphkeys.queue_runners集合中所有的queuerunner,因為這個函式只支援啟動指定集合中的queuerunner,所以一般來說tf.train.add_queue_runner函式和tf.train.start_quque_runners函式會指定同乙個集合。
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# 獲取佇列中的取值
for _ in range(3):
print sess.run(out_tensor)[0]
# 使用tf.train.coordinator來停止所有的執行緒
coord.request_stop()
coord.join(threads)
tensorflow提供了tf.train.batch和tf.train.shuffle_batch函式來將單個的樣例組織成batch的樣式輸出。這兩個函式都會生成乙個佇列,佇列的入隊操作是生成單個樣例的方法,而每次出對得到的是乙個batch的樣例。
import tensorflow as tf
example, label = features['i'], features['j']
batch_size = 3
# 組合樣例的佇列中最多可以儲存的樣例個數。佇列如果太大,那麼需要占用很多記憶體資源;如果太小,那麼出隊操作可能會因為沒有資料而被阻礙,從而導致訓練效率降低。一般來說這個佇列的大小會和每乙個batch的大小相關
capacity = 1000 + 3 * batch_size
# 使用tf.train.batch函式來組合樣例。
example_batch, label_batch = tf.train.batch([example,label], batch_size=batch_size, capacity=capacity)
# 當佇列長度等於容量時,tensorflow將暫停入隊操作,而只等待元素出隊。當元素數小於容量時,tensorflow將自動重新啟動入隊操作。
# 使用tf.train.shuffle_batch函式來組合樣例。
# tf.train.shuffle_batch函式的引數大部分都和tf.train.batch函式相似,但是min_after_dequeue引數是tf.train.shuffle_batch特有的,min_atfer_dequeue引數限制了出隊時佇列中元素的最小個數。當佇列中元素太少時。隨機打亂樣例的作用就不大了。當出隊函式被呼叫但是佇列中元素不夠時,出隊操作將等待更多的元素入隊才會完成。
example_batch, label_batch = tf.train.shuffle_batch([example,label], batch_size=batch_size, capacity=capacity, min_after_dequeue=30)
with tf.session() as sess:
tf.initialize_all_variables().run()
coord = tf.train.coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
for i in range(2):
cur_example_batch, cur_label_batch = sess.run([example_batch,label_batch])
coord.request_stop()
coord.join(threads)
tf.train.batch和tf.train.shuffle_batch函式除了可以將單個訓練資料整理成輸入batch,也提供了並行化處理輸入資料的方法。tf.train.batch函式和tf.train.shuffle_batch函式並行化的方式一致,通過設定函式中的num_threads引數,可以指定多個執行緒同時執行入隊操作,入隊操作就是讀取資料以及預處理的過程。當需要多個執行緒處理不同檔案中的樣例時,可以使用tf.train.shuffle_batch_join函式,此函式會從輸入檔案佇列中獲取不同的檔案分配給不同的執行緒。 Tensorflow使用多執行緒
tensorflow的session物件支援多執行緒,可以在同乙個session中建立多個執行緒,預設是cpu有多少個核,就啟動多少個執行緒。tensorflow提供了倆個類來實現對session中多執行緒的管理 tf.coordinator和tf.queuerunner,這倆個類必須一起使用。co...
tensorflow 佇列與多執行緒
1 tensorflow資料輸入簡介 為了避免影象預處理成為神經網路模型訓練效率的瓶頸,tensorflow提供了多執行緒處理輸入資料的框架。流程如下 1 指定原始資料的檔案列表 2 建立檔案列表佇列 3 從檔案中讀取資料 4 資料預處理 5 整理成batch作為神經網路輸入 tensorflow中...
TensorFlow佇列與多執行緒
1 tf.coordinatorimport numpy as np import threading import time import tensorflow as tf 執行緒中執行的程式,這個程式每隔1秒判斷是否需要停止並列印自己的id def myloop coord,worker id ...