Tensorflow並行計算
Liu91
2018.08.13 14:09:19
字數 1,294
閱讀 8,724
在真正開始Tensorflow並行運算程式碼實現之前,我們首先瞭解一下Tensorflow系統結構設計是如何完美的支援並行運算的。(參見部落格)
前端:提供程式設計模型,負責構造計算圖
後端:提供執行環境,負責執行計算圖
前端提供各種語言的庫(主要是Python),前端庫基於C API觸發tensorflow後端程式執行。
後端的Distributed Runtime下的Distributed Master根據Session.run()的參數,從計算圖中反向遍歷,找到所依賴的最小子圖,並將最小子圖分裂爲多個子圖片段,派發給Work Service, 啓動子圖片段的執行過程。
Kernel主要包括一些具體操作,如折積操作等。後端的最底層是網路層和裝置層。網路層包括RPC和RDMA,負責傳遞神經網路的參數。
⚠️client、master 和 worker各元件的內部工作原理
Client基於tensorflow的程式設計介面來構造計算圖, 主要爲Python和C++ 程式設計介面,直到Session對談被建立tensorflow纔開始工作,Session建立Client和後端執行時的通道,將Graph發送給 Distributed Master,如下爲Client構建了一個簡單的計算Graph:
s = w*x + b
執行Session.run運算時,Master將最小子圖分片派發給Work Service。如下圖所示,PS上放置模型參數,worker上則執行op。
在這個過程中計算圖的邊被任務點分割,Distributed Master會將該邊分裂,並在兩個分佈式任務之間插入send和recv節點,實現數據傳遞。
2.1 數據並行
每個GPU上的模型相同,喂以相同模型不同的訓練樣本。
數據並行根據參數更新方式的不同又可以分爲同步數據並行和非同步數據並行。
同步數據並行:每個GPU根據loss計算各自的gradient,彙總所有GPU的gradient,求平均梯度,根據平均梯度更新模型參數,具體過程見下圖。所以同步數據並行的速度取決於最慢的GPU,當各個GPU的效能相差不大時適用。
同步數據並行
非同步數據並行:和同步並行的區別是,不用等所有GPU的梯度,每個GPU均可更新參數。每個GPU每次取到的參數也是最新的。
據說缺點是:參數容易移出最優解。
非同步數據並行
數據並行,速度取決於最慢的GPU和中心伺服器(分發數據,計算平均梯度的cpu/gpu)的快慢。
2.2 模型並行
同一批訓練樣本,將不同的模型計算部分分佈在不同的計算裝置上同時執行
模型並行
模型並行,比如輸入層到隱層的計算放到gpu0上,隱層到輸出層的計算放到gpu1上。初始啓動時gpu1是不工作的,要等gpu0輸出後才能 纔能執行。能保證對同一批數據的同步嗎?疑惑點⚠️
多機多卡,即client,master,worker不在同一臺機器上時稱之爲分佈式
3.1 同步數據並行
#!/usr/bin/env python
import tensorflow as tf
from tensorflow.python.client import device_lib
import os
import time
os.environ[‘TF_CPP_MIN_LOG_LEVEL’] = ‘2’
################# 獲取當前裝置上的所有GPU ##################
def check_available_gpus():
local_devices = device_lib.list_local_devices()
gpu_names = [x.name for x in local_devices if x.device_type == ‘GPU’]
gpu_num = len(gpu_names)
print(’{0} GPUs are detected : {1}’.format(gpu_num, gpu_names))
return gpu_num # 返回GPU個數
os.environ[‘CUDA_VISIBLE_DEVICES’] = ‘12, 13, 14, 15’
N_GPU = 4 # 定義GPU個數
BATCH_SIZE = 100*N_GPU
LEARNING_RATE = 0.001
EPOCHS_NUM = 1000
NUM_THREADS = 10
MODEL_SAVE_PATH = ‘data/tmp/logs_and_models/’
MODEL_NAME = ‘model.ckpt’
DATA_PATH = ‘data/test_data.tfrecord’
def _parse_function(example_proto):
dics = {
‘sample’: tf.FixedLenFeature([5], tf.int64),
‘label’: tf.FixedLenFeature([], tf.int64)}
parsed_example = tf.parse_single_example(example_proto, dics)
parsed_example[‘sample’] = tf.cast(parsed_example[‘sample’], tf.float32)
parsed_example[‘label’] = tf.cast(parsed_example[‘label’], tf.float32)
return parsed_example
def _get_data(tfrecord_path = DATA_PATH, num_threads = NUM_THREADS, num_epochs = EPOCHS_NUM, batch_size = BATCH_SIZE, num_gpu = N_GPU):
dataset = tf.data.TFRecordDataset(tfrecord_path)
new_dataset = dataset.map(_parse_function, num_parallel_calls=num_threads)# 同時設定了多執行緒
shuffle_dataset = new_dataset.shuffle(buffer_size=10000)# shuffle打亂順序
repeat_dataset = shuffle_dataset.repeat(num_epochs)# 定義重複訓練多少次全部樣本
batch_dataset = repeat_dataset.batch(batch_size=batch_size)
iterator = batch_dataset.make_one_shot_iterator()# 建立迭代器
next_element = iterator.get_next()
x_split = tf.split(next_element['sample'], num_gpu)
y_split = tf.split(next_element['label'], num_gpu)
return x_split, y_split
def _init_parameters():
w1 = tf.get_variable(‘w1’, shape=[5, 10], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=9))
b1 = tf.get_variable(‘b1’, shape=[10], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=1))
w2 = tf.get_variable(‘w2’, shape=[10, 1], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=0))
b2 = tf.get_variable(‘b2’, shape=[1], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=2))
return w1, w2, b1, b2
def average_gradients(tower_grads):
avg_grads = []
# grad_and_vars代表不同的參數(含全部gpu),如四個gpu上對應w1的所有梯度值
for grad_and_vars in zip(*tower_grads)
grads = []
for g, _ in grad_and_vars:# 這裏回圈的是不同gpu
expanded_g = tf.expand_dims(g, 0) # 擴充套件一個維度代表gpu,如w1=shape(5,10), 擴充套件後變爲shape(1,5,10)
grads.append(expanded_g)
grad = tf.concat(grads, 0) # 在第一個維度上合併
grad = tf.reduce_mean(grad, 0)# 求平均
v = grad_and_vars[0][1] # v 是變數
grad_and_var = (grad, v) # 這裏是將平均梯度和變數對應起來
# 將不同變數的平均梯度append一起
avg_grads.append(grad_and_var)
# return average gradients
return avg_grads
w1, w2, b1, b2 = _init_parameters()
x_split, y_split = _get_data()
opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)
tower_grads = []
for i in range(N_GPU):
with tf.device("/gpu:%d" % i):
y_hidden = tf.nn.relu(tf.matmul(x_split[i], w1) + b1)
y_out = tf.matmul(y_hidden, w2) + b2
y_out = tf.reshape(y_out, [-1])
cur_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=y_out, labels=y_split[i], name=None)
grads = opt.compute_gradients(cur_loss)
tower_grads.append(grads)
###### 這裏建立一個session主要是想獲取參數的具體數值,以檢視是否對於每一個gpu來說都沒有更新參數。
##### 當然,這裏從程式也能看出,在每個gpu上只是計算梯度,並沒有更新參數。
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
sess.run(tower_grads)
print(’=============== parameter test sy =========’)
print(i)
print(sess.run(b1))
coord.request_stop()
coord.join(threads)
grads = average_gradients(tower_grads)
apply_gradient_op = opt.apply_gradients(grads)
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
for step in range(1000):
start_time = time.time()
sess.run(apply_gradient_op)
duration = time.time() - start_time
if step != 0 and step % 100 == 0:
num_examples_per_step = BATCH_SIZE * N_GPU
examples_per_sec = num_examples_per_step / duration
sec_per_batch = duration / N_GPU
print('step:', step, grads, examples_per_sec, sec_per_batch)
print('=======================parameter b1============ :')
print(sess.run(b1))
coord.request_stop()
coord.join(threads)
計算所有gpu的平均梯度,再更新參數。
計算所有gpu平均損失函數,用梯度更新參數
各個gpu得到新參數,再平均更新參數,這樣有沒有影響,參數暫時怎麼存放等。
1和2的結果理論上應該是相同的
3.2 非同步數據並行
‘’’
這裏先定義訓練模型,利用optimizer.minimize()直接更新模型參數
‘’’
def _model_nn(w1, w2, b1, b2, x_split, y_split, i_gpu):
y_hidden = tf.nn.relu(tf.matmul(x_split[i_gpu], w1) + b1)
y_out = tf.matmul(y_hidden, w2) + b2
y_out = tf.reshape(y_out, [-1])
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=y_out, labels=y_split[i_gpu], name=None)
opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)
train = opt.minimize(loss)
return train
w1, w2, b1, b2 = _init_parameters()
x_split, y_split = _get_data()
for i in range(N_GPU):
with tf.device("/gpu:%d" % i):
train = _model_nn(w1, w2, b1, b2, x_split, y_split, i)
##### 同樣,這裏建立session主要是爲了檢查在每個gpu的時候,變數是否更新了。
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
sess.run(train)
print(’=============== parameter test Asy =========’)
print(i)
print(sess.run(b1))
coord.request_stop()
coord.join(threads)
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
for step in range(2):
sess.run(train)
print('======================= parameter b1 ================ :')
print('step:', step, sess.run(b1)
coord.request_stop()
coord.join(threads)
tf.name_scope()主要是與Variable配合,方便參數命名管理
tf.variable_scope與tf.get_variable配合使用,實現變數共用
tf.name_scope名稱空間是便於管理變數,不同名稱空間下的用Variable定義的變數名允許相同。可以理解爲名字相同,但是姓(名稱空間)不同,指向的也是不同變數。
而tf.get_variable()定義的變數不受名稱空間的限制(主要是用於共用變數,避免大型網路結構中定義過多的模型參數。
我們主要看tf.variable_scope()的使用。
tf.variable_scope(
name_or_scope, # name
default_name=None,
values=None,
initializer=None,
regularizer=None,
caching_device=None,
partitioner=None,
custom_getter=None,
reuse=None, # True, None, or tf.AUTO_REUSE;
# if True, we go into reuse mode for this scope as well as all sub-scopes;
# if tf.AUTO_REUSE, we create variables if they do not exist, and return them otherwise;
# if None, we inherit the parent scope’s reuse flag. When eager execution is enabled, new variables are always created unless an EagerVariableStore or template is currently active.
dtype=None,
use_resource=None,
constraint=None,
auxiliary_name_scope=True
)
tf.variable_scope()可以理解爲從某個name的籃子裡取東西。在這個籃子裡,只要名字相同,下次可以反覆 反復的用這個變數。
tf.variable_scope()可以節省記憶體,官網的例子:
import tensorflow as tf
def my_image_filter():
conv1_weights = tf.Variable(tf.random_normal([5, 5, 32, 32]),
name=「conv1_weights」)
conv1_biases = tf.Variable(tf.zeros([32]), name=「conv1_biases」)
conv2_weights = tf.Variable(tf.random_normal([5, 5, 32, 32]),
name=「conv2_weights」)
conv2_biases = tf.Variable(tf.zeros([32]), name=「conv2_biases」)
return
result1 = my_image_filter()
result2 = my_image_filter()
vs = tf.trainable_variables()
print('There are %d train_able_variables in the Graph: ’ % len(vs))
for v in vs:
print(v)
這是官網上的例子,從輸出可以看出調用my_image_fileter()兩次,會有8個變數
There are 8 train_able_variables in the Graph:
<tf.Variable ‘conv1_weights:0’ shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable ‘conv1_biases:0’ shape=(32,) dtype=float32_ref>
<tf.Variable ‘conv2_weights:0’ shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable ‘conv2_biases:0’ shape=(32,) dtype=float32_ref>
<tf.Variable ‘conv1_weights_1:0’ shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable ‘conv1_biases_1:0’ shape=(32,) dtype=float32_ref>
<tf.Variable ‘conv2_weights_1:0’ shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable ‘conv2_biases_1:0’ shape=(32,) dtype=float32_ref>
如果用tf.variable_scope()共用變數會怎麼樣呢?
import tensorflow as tf
def conv_relu(kernel_shape, bias_shape):
# Create variable named 「weights」.
weights = tf.get_variable(「weights」, kernel_shape, initializer=tf.random_normal_initializer())
# Create variable named 「biases」.
biases = tf.get_variable(「biases」, bias_shape, initializer=tf.constant_initializer(0.0))
return
def my_image_filter():
# 按照下面 下麪的方式定義折積層,非常直觀,而且富有層次感
with tf.variable_scope(「conv1」):
# Variables created here will be named 「conv1/weights」, 「conv1/biases」.
relu1 = conv_relu([5, 5, 32, 32], [32])
with tf.variable_scope(「conv2」):
# Variables created here will be named 「conv2/weights」, 「conv2/biases」.
return conv_relu([5, 5, 32, 32], [32])
with tf.variable_scope(「image_filters」) as scope:
# 下面 下麪我們兩次呼叫 my_image_filter 函數,但是由於引入了 變數共用機制 機製
# 可以看到我們只是建立了一遍網路結構。
result1 = my_image_filter()
scope.reuse_variables()
result2 = my_image_filter()
vs = tf.trainable_variables()
print('There are %d train_able_variables in the Graph: ’ % len(vs))
for v in vs:
print(v)
輸出爲:
There are 4 train_able_variables in the Graph:
<tf.Variable ‘image_filters/conv1/weights:0’ shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable ‘image_filters/conv1/biases:0’ shape=(32,) dtype=float32_ref>
<tf.Variable ‘image_filters/conv2/weights:0’ shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable ‘image_filters/conv2/biases:0’ shape=(32,) dtype=float32_ref>
簡言之,二者對於神經網路的作用就是讓我們寫出來的神經網路結構(節點和節點之間的連線)更加的清楚明瞭。
使用名稱空間前
使用名稱空間之後
下面 下麪我們用名稱空間整理一下之前簡單二分類的網路結構:
#!/usr/bin/env python
import os
import tensorflow as tf
os.environ[‘TF_CPP_MIN_LOG_LEVEL’] = ‘2’
os.environ[‘CUDA_VISIBLE_DEVICES’] = ‘12, 13, 14, 15’
N_GPU = 4 # GPU number
BATCH_SIZE = 100*N_GPU
LEARNING_RATE = 0.001
EPOCHS_NUM = 1000
NUM_THREADS = 10
DATA_DIR = ‘data/tmp/’
LOG_DIR = ‘data/tmp/log’
DATA_PATH = ‘data/test_data.tfrecord’
def _parse_function(example_proto):
dics = {
‘sample’: tf.FixedLenFeature([5], tf.int64),
‘label’: tf.FixedLenFeature([], tf.int64)}
parsed_example = tf.parse_single_example(example_proto, dics)
parsed_example[‘sample’] = tf.cast(parsed_example[‘sample’], tf.float32)
parsed_example[‘label’] = tf.cast(parsed_example[‘label’], tf.float32)
return parsed_example
def _get_data(tfrecord_path = DATA_PATH, num_threads = NUM_THREADS, num_epochs = EPOCHS_NUM, batch_size = BATCH_SIZE, num_gpu = N_GPU):
with tf.variable_scope(‘input_data’):
dataset = tf.data.TFRecordDataset(tfrecord_path)
new_dataset = dataset.map(_parse_function, num_parallel_calls=num_threads)
shuffle_dataset = new_dataset.shuffle(buffer_size=10000)
repeat_dataset = shuffle_dataset.repeat(num_epochs)
batch_dataset = repeat_dataset.batch(batch_size=batch_size)
iterator = batch_dataset.make_one_shot_iterator()
next_element = iterator.get_next()
x_split = tf.split(next_element[‘sample’], num_gpu)
y_split = tf.split(next_element[‘label’], num_gpu)
return x_split, y_split
def weight_bias_variable(weight_shape, bias_shape):
weight = tf.get_variable(‘weight’, weight_shape, initializer=tf.random_normal_initializer(mean=0, stddev=1))
bias = tf.get_variable(‘bias’, bias_shape, initializer=tf.random_normal_initializer(mean=0, stddev=1))
return weight, bias
def hidden_layer(x_data, input_dim, output_dim, layer_name):
with tf.variable_scope(layer_name, reuse=tf.AUTO_REUSE):
weight, bias = weight_bias_variable([input_dim, output_dim], [output_dim])
# calculation output
y_hidden = tf.nn.relu(tf.matmul(x_data, weight) + bias)
tf.summary.histogram(‘weight’, weight)
tf.summary.histogram(‘bias’, bias)
tf.summary.histogram(‘y_hidden’, y_hidden)
return y_hidden
def output_grads(y_hidden, y_label, input_dim, output_dim):
with tf.variable_scope(‘out_layer’, reuse=tf.AUTO_REUSE):
weight, bias = weight_bias_variable([input_dim, output_dim], [output_dim])
tf.summary.histogram(‘bias’, bias)
y_out = tf.matmul(y_hidden, weight) + bias
y_out = tf.reshape(y_out, [-1])
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=y_out, labels=y_label)
loss_mean = tf.reduce_mean(loss, 0)
tf.summary.scalar(‘loss’, loss_mean)
grads = opt.compute_gradients(loss_mean)
return loss_mean, grads
def average_gradients(tower_grads):
avg_grads = []
# list all the gradient obtained from different GPU
# grad_and_vars represents gradient of w1, b1, w2, b2 of different gpu respectively
for grad_and_vars in zip(*tower_grads): # w1, b1, w2, b2
# calculate average gradients
# print('grad_and_vars: ', grad_and_vars)
grads = []
for g, _ in grad_and_vars: # different gpu
expanded_g = tf.expand_dims(g, 0) # expand one dimension (5, 10) to (1, 5, 10)
grads.append(expanded_g)
grad = tf.concat(grads, 0) # for 4 gpu, 4 (1, 5, 10) will be (4, 5, 10),concat the first dimension
grad = tf.reduce_mean(grad, 0) # calculate average by the first dimension
# print('grad: ', grad)
v = grad_and_vars[0][1] # get w1 and then b1, and then w2, then b2, why?
# print('v',v)
grad_and_var = (grad, v)
# print('grad_and_var: ', grad_and_var)
# corresponding variables and gradients
avg_grads.append(grad_and_var)
return avg_grads
with tf.name_scope(‘input_data’):
x_split, y_split = _get_data()
opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)
tower_grads = []
for i in range(N_GPU):
with tf.device("/gpu:%d" % i):
with tf.name_scope(‘GPU_%d’ %i) as scope:
y_hidden = hidden_layer(x_split[i], input_dim=5, output_dim=10, layer_name=‘hidden1’)
loss_mean, grads = output_grads(y_hidden, y_label=y_split[i], input_dim=10, output_dim=1)
tower_grads.append(grads)
with tf.name_scope(‘update_parameters’):
# get average gradient
grads = average_gradients(tower_grads)
for i in range(len(grads)):
tf.summary.histogram(‘gradients/’+grads[i][1].name, grads[i][0])
# update parameters。
apply_gradient_op = opt.apply_gradients(grads)
init = tf.global_variables_initializer()
config = tf.ConfigProto()
config.gpu_options.allow_growth = False # 設定GPU記憶體分配,剛一開始分配少量的GPU容量,
config.allow_soft_placement = True # 當指定裝置不存在時,找可用裝置
config.log_device_placement = False
with tf.Session(config=config) as sess:
sess.run(init)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
merged = tf.summary.merge_all()
writer = tf.summary.FileWriter('data/tfboard', sess.graph)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
for step in range(1000):
sess.run(apply_gradient_op)
summary = sess.run(merged)
writer.add_summary(summary, step)
writer.close()
coord.request_stop()
coord.join(threads)
根據儲存路徑,在終端輸入:
$ tensorboard --logdir=路徑(例如:/Users/username/PycharmProjects/firsttensorflow/multigpu/data/tfboard)
在瀏覽器中輸入http://localhost:6006,出現tensorboard的介面,預設介面是scalar(標量):
切換到graph介面如下圖所示:
原文鏈接:https://www.jianshu.com/p/54c12a1a9c38