GPU单机单卡/单机多卡/多机单卡/多机多卡的实现过程
一单机单卡# -*- coding: UTF-8 -*-#单机单卡#对于单机单卡,可以吧参数和计算都定义再GPU上,不过如果参数模型比较大,显存不足等,就得放在CPU上import tensorflow as tfwith tf.device("/cpu:0"): #也可以放在GPU上w = tf.get_variable('w', (2,2), tf.float32, i...
·
一单机单卡
# -*- coding: UTF-8 -*- #单机单卡 #对于单机单卡,可以吧参数和计算都定义再GPU上,不过如果参数模型比较大,显存不足等,就得放在CPU上 import tensorflow as tf with tf.device("/cpu:0"): #也可以放在GPU上 w = tf.get_variable('w', (2,2), tf.float32, initializer=tf.constant_initializer(2)) b = tf.get_variable('b', (2,2), tf.float32, initializer=tf.constant_initializer(5)) with tf.device("/gpu:0"): addwb = w+b mutwb = w*b ini = tf.initialize_all_variables() with tf.Session() as sess: sess.run(ini) np1, np2 = sess.run([addwb, mutwb]) print(np1) print(np2)
二单机多卡的实现过程
"""#数据并行 分布式训练中的数据并行方法在每一个worker machine 上都有一套完整的模型,单分别对训练数据集的不通子集进行处理 数据并行训练方法均需要一些整合结果和在各工作器(worker)间同步模型参数的方法 1参数平均vs.基于更新(梯度)的方法:所有的参数加起来除以参数个数 1.1参数平均 (1)根据模型配置随机初始化网络参数 (2)将现有的参数的一个副本分配给每一个worker machine (3)在该数据的一个子集上对每一个worker进行训练 (4)从每一个worker的平均参数上设置一个全局参数 (5)当还需要处理更多数据时,回到第2步 2同步VS异步的方法 同步:每次计算都需要等到所有GPU运行完成之后,问题:等待时间长影响GPU利用率,取决于运算最慢的机器运行时间,机器之间通信太浪费 如果一台机器卡住/计算结果无穷大不会收敛,会导致所有参数没有上报Parameter Server服务器 异步:只要有一台计算完成,则会这台传输Parameter Server,会出现梯度失效问题。(开始参数都相同, 但是可能一个设备完成迭代后,发现模型参数被其它更新) 只能实现梯度次优解,不能实现最优解 不需要所有运算完成再计算。 3集中式VS.分布式的同步 模型并行(一般情况下不采用模型并行,) 把模型部署到很多机器上去运行,当神经网络模型很大的时候GPU显存会有限制,很难跑在单个GPU上,所以需要GPU并行 但实际上,层与层只有有约束,后面的层需要后面的层作为输入。 如果模型本身含有并行模块,则可以进行模型并行的训练 需要有参数服务器:K/V格式 ,再加上参数更新(并行文件系统)。计算节点为worker节点 ps服务器一般node0, node1, worker节点node2,node3, 分布式包含两种格式 in-graph模式:把计算节点从单机多GPU扩展到了多机多GPU, 不过数据分发还是在一个节点。 这样的好处是配置简单,但是这样的坏处是训练数据的分发依然在一个节点上, 要把训练数据分发到不通的机器上, 严重影响并发训练速度。在大数据训练的情况下,不推荐使用这种模式 between-graph模式下,训练的参数保存在参数服务器,数据不用分发,数据分片的保存在各个计算节点, 各个计算节点自己算自己的,算完之后,把要更新的参数告诉参数服务器,参数服务器更新参数。 这种模式的优点是不用训练数据的分发,尤其是在数据量在TB级的时候,所以大数据深度学习推荐使用between-graph模式。 """ # -*- coding: UTF-8 -*- #单机多卡 #各个GPU通过各自计算各自batch数据的梯度值,然后统一传到CPU上, 由CPU计算求取平均值,CPU更新参数 import tensorflow as tf with tf.device("/cpu:0"): w = tf.get_variable('w', (2,2), tf.float32, initializer=tf.constant_initializer(2)) b = tf.get_variable('b', (2, 2), tf.float32, initializer=tf.constant_initializer(5)) with tf.device("/gpu:0"): addwb = w + b with tf.device("/gpu:1"): mutwb = w*b ini = tf.initialize_all_variables() with tf.Session() as sess: sess.run(ini) while 1: print(sess.run([addwb, mutwb]))
三多机单卡
#多机单卡的关键点 "机"可以指什么? 物理机/docker容器 "多机"之间如何联通,如何交换信息? "多机单卡"相对于"单机多卡"性能比较 基本概念 。Cluster 。Job 。task # -*- coding: UTF-8 -*- import tensorflow as tf #现在假设我们有A,B,C,D四台机器,每台机器上由一个GPU,首先需要在各台机器上写一份代码,跑起来, 各机器上的代码内容大部分相同 #除了开始定义的时候,需要各自指定该台机器的task之外,以机器A为离子, A机器上的代码如下 cluster = tf.train.ClusterSpec({ "worker":[ "A_IP:2222", #IP地址:端口号,第一台机器A的IP地址,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0for "B_IP:1234", #第二台机器的IP地址/job:worker/task:1 "C_IP:2222"#第三台机器的IP地址/job:worker/task:2 ], "ps":[ "D_IP:2222",#第四台机器的IP地址对应到代码块:/job:ps/task:0 ]}) # -*- coding: UTF-8 -*- #上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的 #以为都是计算神经网络的每个batch前向传导,所以一般代码是重用的 import tensorflow as tf #现在假设我们有A,B台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的内容大部分相同 cluster = tf.train.ClusterSpec({ "worker":[ "192.168.11.105:1234"], #省略所有IP都需要写入 "ps":["192.168.11.130:2223"]})
四多机多卡的实现过程
#多机多卡 import tensorflow as tf var = tf.Variable(initial_value=0.0) #第一步,我们需要为每个进程创建自己的会话 sess1 = tf.Session() sess2 = tf.Session() sess1.run(tf.global_variables_initializer()) sess2.run(tf.global_variables_initializer()) print("Initial value of var in session 1:", sess1.run(var)) print("Initial value of var in session 2:", sess2.run(var)) sess1.run(var.assign_add(1.0)) sess2.run(var.assign_add(2.0)) print("Incremented var in session 1") print("Value of var in session 1: ", sess1.run(var)) print("Value of var in session 2: ", sess2.run(var)) #Distributed TensorFlow import tensorflow as tf c = tf.constant("Hello, Distributed TensorFlow!") #创建一个本地TensorFlow集群 server = tf.train.Server.create_local_server() #在集群上创建一个会话 sess = tf.Session(server.target) print(sess.run(c)) #第一步是定义集群的规模。我们从最简单的集群开始:即两台服务器(两个任务),它们都在同一台机器上,一个在 2222 端口,一个在 2223 端口。 tasks = ["localhost:2222", "localhost:2223"] #每个任务都与「工作」(job)相关联,该工作是相关任务的集合。我们将这两个任务与一个称为「local」的工作相关联。 jobs = {"local": tasks} #所有这些即定义为一个集群。 cluster = tf.train.ClusterSpec(jobs) server1 = tf.train.Server(cluster, job_name="local", task_index=0) server2 = tf.train.Server(cluster, job_name="local", task_index=1) #特性:任何具有相同名称的变量都将在所有服务器之间共享。 tf.reset_default_graph() var = tf.Variable(initial_value=0.0, name='var') sess1 = tf.Session(server1.target) sess2 = tf.Session(server2.target) sess1.run(tf.global_variables_initializer()) sess2.run(tf.global_variables_initializer()) print("Initial value of var in session 1:", sess1.run(var)) print("Initial value of var in session 2:", sess2.run(var)) sess1.run(var.assign_add(1.0)) print("Incremented var in session 1") print("Value of var in session 1:", sess1.run(var)) print("Value of var in session 2:", sess2.run(var)) print("OK------------------------------------") #存放 def run_with_location_trace(sess, op): run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) run_metadata = tf.RunMetadata() sess.run(op, options=run_options, run_metadata=run_metadata) for device in run_metadata.step_stats.dev_stats: print(device.device) for node in device.node_stats: print(" ", node.node_name) run_with_location_trace(sess1, var) run_with_location_trace(sess1, var.assign_add(1.0)) run_with_location_trace(sess2, var) with tf.device("/job:local/task:0"): var1 = tf.Variable(0.0, name='var1') with tf.device("/job:local/task:1"): var2 = tf.Variable(0.0, name='var2') # (This will initialize both variables) sess1.run(tf.global_variables_initializer()) run_with_location_trace(sess1, var1) run_with_location_trace(sess1, var2) run_with_location_trace(sess2, var2) run_with_location_trace(sess2, var1) #计算图 cluster = tf.train.ClusterSpec({"local": ["localhost:2224", "localhost:2225"]}) server1 = tf.train.Server(cluster, job_name="local", task_index=0) server2 = tf.train.Server(cluster, job_name="local", task_index=1) graph1 = tf.Graph() with graph1.as_default(): var1 = tf.Variable(0.0, name='var') sess1 = tf.Session(target=server1.target, graph=graph1) print(graph1.get_operations()) graph2 = tf.Graph() sess2 = tf.Session(target=server2.target, graph=graph2) print(graph2.get_operations()) with graph2.as_default(): var2 = tf.Variable(0.0, name='var') sess1.run(var1.assign(1.0)) sess2.run(var2) #实现细节 import tensorflow as tf #谁负责初始化共享变量 def s1(): server1 = tf.train.Server(cluster, job_name="local", task_index=0) var = tf.Variable(0.0, name='var') sess1 = tf.Session(server1.target) print("Server 1: waiting for connection...") sess1.run(tf.report_uninitialized_variables()) while len(sess1.run(tf.report_uninitialized_variables())) > 0: print("Server 1: waiting for initialization...") sleep(1.0) print("Server 1: variables initialized!") def s2(): server2 = tf.train.Server(cluster, job_name="local", task_index=1) var = tf.Variable(0.0, name='var') sess2 = tf.Session(server2.target) for i in range(3): print("Server 2: waiting %d seconds before initializing..." % (3 - i)) sleep(1.0) sess2.run(tf.global_variables_initializer()) p1 = Process(target=s1, daemon=True) p2 = Process(target=s2, daemon=True) p1.start() p2.start() p1.terminate() p2.terminate() #示例 #我们会创建: #一个存储单个变量 var 的参数服务器。 #两个工作站任务(worker task),每个工作站将多次增加变量 var 的值。 我们将让参数服务器多输出几次 var 的值,以便查看其变化。 import tensorflow as tf from multiprocessing import Process from time import sleep cluster = tf.train.ClusterSpec({ "worker": [ "localhost:3333", "localhost:3334", ], "ps": [ "localhost:3335" ] }) def parameter_server(): with tf.device("/job:ps/task:0"): var = tf.Variable(0.0, name='var') server = tf.train.Server(cluster, job_name="ps", task_index=0) sess = tf.Session(target=server.target) print("Parameter server: waiting for cluster connection...") sess.run(tf.report_uninitialized_variables()) print("Parameter server: cluster ready!") print("Parameter server: initializing variables...") sess.run(tf.global_variables_initializer()) print("Parameter server: variables initialized") for i in range(5): val = sess.run(var) print("Parameter server: var has value %.1f" % val) sleep(1.0) print("Parameter server: blocking...") server.join() def worker(worker_n): with tf.device("/job:ps/task:0"): var = tf.Variable(0.0, name='var') server = tf.train.Server(cluster, job_name="worker", task_index=worker_n) sess = tf.Session(target=server.target) print("Worker %d: waiting for cluster connection..." % worker_n) sess.run(tf.report_uninitialized_variables()) print("Worker %d: cluster ready!" % worker_n) while sess.run(tf.report_uninitialized_variables()): print("Worker %d: waiting for variable initialization..." % worker_n) sleep(1.0) print("Worker %d: variables initialized" % worker_n) for i in range(5): print("Worker %d: incrementing var" % worker_n) sess.run(var.assign_add(1.0)) sleep(1.0) print("Worker %d: blocking..." % worker_n) server.join() ps_proc = Process(target=parameter_server, daemon=True) w1_proc = Process(target=worker, args=(0,), daemon=True) w2_proc = Process(target=worker, args=(1,), daemon=True) ps_proc.start() w1_proc.start() w2_proc.start() for proc in [w1_proc, w2_proc, ps_proc]: proc.terminate() #如何将多个 TensorFlow 执行引擎(运行在不同进程或不同机器上)集成为一个集群,以便共享变量。 #如何为变量或操作指定服务器。 #图内复制与图间复制。 #如何等待变量被集群中的另一个任务初始化。
更多推荐
所有评论(0)