分布式 TensoFlow:入坑指南

翻译自:http://amid.fish/distributed-tensorflow-a-gentle-introduction

分布式 TensorFlow 允许我们在多个进程之间共享TensorFlow 的子图,而且各进程很可能运行在不同的机器上。


我们为什么要这样做呢? 常用方法是利用多台机器进行训练,所有机器之间共享参数。 即使我们只是在一台机器上运行,我已经遇到了两个强化学习的例子,这些学习必须共享进程:

  • 在论文 A3C中,多个代理在多个进程中并行运行,同时探索不同的环境副本。 每个代理都生成参数更新,这些更新也必须发送给不同进程中的其他代理。
  • 在论文 Deep Reinforcement Learning from Human Preferences中,一个进程运行一个探索环境的代理,通过由人类对代理行为的偏好训练的网络来计算奖励。 这个网络是在一个单独的进程中与代理异步训练的,这样代理不必等待每个训练周期结束再去探索。

不幸的是,关于分布式TensorFlow的官方文档过于简略。 我们需要一个稍微易懂的介绍 — 通过Jupyter运行一些基本例子。
如果你想follow Jupyter,可以在GitHub上找到源代码。
(注意:这里的一些解释是作者对实证结果/ TensorFlow文档的解释,如果发现有什么不对的地方,请给作者留言!)

Introduction

import tensorflow as tf

比方说,我们希望多个进程共享一些共同的参数。 为了简单起见,假设这只是一个单一的变量:

var = tf.Variable(initial_value=0.0)

第一步,我们可以想象每个进程都需要自己的会话。 (假设会话1在一个进程中创建,而会话2在另一个进程中创建)。

sess1 = tf.Session()
sess2 = tf.Session()

sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())

每次调用tf.Session()都会创建一个单独的“执行引擎”,然后将会话句柄连接到执行引擎。 执行引擎是实际存储变量值并运行操作的东西。
通常,不同进程中的执行引擎是不相关的。 在一个会话中更改变量(在一个执行引擎上)不会影响其他会话中的变量。

print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))

sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")

print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))

————————————————————————————————————————————
(输出结果)
Initial value of var in session 1: 0.0
Initial value of var in session 2: 0.0
Incremented var in session 1
Value of var in session 1: 1.0
Value of var in session 2: 0.0

 

Distributed TensorFlow

为了在进程之间共享变量,我们需要将不同的执行引擎连接在一起。 输入分布式张量流。
使用分布式TensorFlow,每个进程运行一个特殊的执行引擎:一个TensorFlow服务器。 服务器作为集群的一部分链接在一起。 (群集中的每个服务器也称为任务。)
第一步是定义集群的规模。 我们从最简单的集群开始:两台服务器(两个任务),都在同一台机器上; 一个在2222端口,一个在2223端口。

tasks = ["localhost:2222", "localhost:2223"]

每个任务都与作业相关联,该作业是相关任务的集合。 我们将这两个任务与一个称为“本地”的工作相关联。

jobs = {"local": tasks}

所有这些即为集群。

cluster = tf.train.ClusterSpec(jobs)

我们现在可以启动服务器,指定每个服务器对应为集群定义中的哪个服务器。 立即启动各服务器,监听集群设置中指定的端口。

# "This server corresponds to the the first task (task_index=0)
# of the tasks associated with the 'local' job."
server1 = tf.train.Server(cluster, job_name="local", task_index=0)

server2 = tf.train.Server(cluster, job_name="local", task_index=1)

将服务器连接在同一个集群中,我们现在可以体验到分布式TensorFlow的强大功能:任何具有相同名称的变量都将在所有服务器之间共享。
最简单的例子是在所有的服务器上运行同一张图,每个图只有一个变量,像以前一样:

tf.reset_default_graph()
var = tf.Variable(initial_value=0.0, name='var')
sess1 = tf.Session(server1.target)
sess2 = tf.Session(server2.target)

现在,在一台服务器上对变量所作的修改将在第二台服务器上作镜像处理。

sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())

print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))

sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")

print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))

————————————————————————————————————————————————————————————————————

Initial value of var in session 1: 0.0
Initial value of var in session 2: 0.0
Incremented var in session 1
Value of var in session 1: 1.0
Value of var in session 2: 1.0

(请注意,因为我们只有一个变量且该变量由两个会话共享,第二个会话再调用 global_variables_initializer 就有些多余

Placement

现在我们可能在想:变量究竟存储在哪个服务器上? 又是哪个服务器运行这些操作?
按经验来说,变量和操作都默认存储在集群的第一个任务上。

def run_with_location_trace(sess, op):
    # From https://stackoverflow.com/a/41525764/7832197
    run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
    run_metadata = tf.RunMetadata()
    sess.run(op, options=run_options, run_metadata=run_metadata)
    for device in run_metadata.step_stats.dev_stats:
      print(device.device)
      for node in device.node_stats:
        print("  ", node.node_name)

例如,如果我们使用连接到第一个任务的会话来处理变量,那么所有操作都会运行在这个任务上:

run_with_location_trace(sess1, var)

——————————————————————————————————————————————————————

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var

 

run_with_location_trace(sess1, var.assign_add(1.0))

——————————————————————————————————————————————————————————

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   AssignAdd_1/value
   var
   AssignAdd_1

但是,如果我们尝试使用连接到第二个任务的会话做些什么,那么图节点仍然会在第一个任务上运行。

run_with_location_trace(sess2, var)

———————————————————————————————————————————————————————————

/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var

要将一个变量或一个操作固定到一个特定的任务,我们可以使用tf.device:

with tf.device("/job:local/task:0"):
    var1 = tf.Variable(0.0, name='var1')
with tf.device("/job:local/task:1"):
    var2 = tf.Variable(0.0, name='var2')
    
# (This will initialize both variables)
sess1.run(tf.global_variables_initializer())

现在 变量1 像之前一样运行在第一个任务上。

run_with_location_trace(sess1, var1)

————————————————————————————————————————————————

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var1

但是var2运行在第二个任务上。 即使我们尝试使用连接到第一个任务的会话来评测它,它仍然在第二个任务上运行。

run_with_location_trace(sess1, var2)

————————————————————————————————————————————————

/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
   var2

变量2 亦是如此。

run_with_location_trace(sess2, var2)

————————————————————————————————————————————————

/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
   var2

 

run_with_location_trace(sess2, var1)

————————————————————————————————————————————————————

/job:local/replica:0/task:1/device:CPU:0
   _SOURCE
/job:local/replica:0/task:0/device:CPU:0
   _SOURCE
   var1

 

Graphs

分布式TensorFlow处理图的过程有几点需要注意。

Who builds the graph?

首先,尽管在整个集群中共享变量值,但图并不会自动共享。
我们用两台服务器创建一个新的集群,然后在第一台服务器上显式创建一个图。

cluster = tf.train.ClusterSpec({"local": ["localhost:2224", "localhost:2225"]})
server1 = tf.train.Server(cluster, job_name="local", task_index=0)
server2 = tf.train.Server(cluster, job_name="local", task_index=1)
graph1 = tf.Graph()
with graph1.as_default():
    var1 = tf.Variable(0.0, name='var')
sess1 = tf.Session(target=server1.target, graph=graph1)
print(graph1.get_operations())

————————————————————————————————————————————————————————————————————————————

[<tf.Operation 'var/initial_value' type=Const>, <tf.Operation 'var' type=VariableV2>,
 <tf.Operation 'var/Assign' type=Assign>, <tf.Operation 'var/read' type=Identity>]

如果我们创建连接到第二台服务器的会话,请注意图不会自动获取镜像。

graph2 = tf.Graph()
sess2 = tf.Session(target=server2.target, graph=graph2)
print(graph2.get_operations())

——————————————————————————————————————————————————————————
[]

要访问共享变量,我们必须手动添加一个同名的变量到第二个图中。

with graph2.as_default():
    var2 = tf.Variable(0.0, name='var')

只有如此我们才可以访问到它。

sess1.run(var1.assign(1.0))
sess2.run(var2)
————————————————————————————————
1.0

关键是:每个服务器负责建立自己的图。

Does the graph have to be the same on all servers?

到目前为止,我们所有的例子都是在两台服务器上运行相同的图。 这被称为图内复制。
例如,假设我们有一个包含三台服务器的集群。 服务器1保存共享参数,而服务器2和服务器3是工作节点,每个都有本地变量。 在图内复制中,每台服务器的图如下所示:


图内复制的问题在于,每个服务器都必须具有整个图的副本,包括可能只与其他服务器相关的子图。 这可能会导致图变得非常大。

另一种方法是图间复制。 在这里,每个服务器都运行一个只包含共享参数的图,而且任何变量和操作都与某一个服务器相关。

由于它缩减图的大小,我们推荐使用图间复制。

Practical Details

在本文收工之前,有几个实际中遇到的细节问题值得讨论一下。

What happens if we try to run something on the cluster before all servers have connected?

我们再次创建一个双任务集群。

cluster = tf.train.ClusterSpec({
    "local": ["localhost:2226", "localhost:2227"]
})

这一次,让我们在隔离进程中启动每个服务器。 (这允许我们随时关闭服务器,以便再次启动它们进行后续的实验。除了杀死启动它们的进程之外,目前没有其它办法关闭服务器。)

from multiprocessing import Process
from time import sleep

def s1():
    server1 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=0)
    sess1 = tf.Session(server1.target)
    print("server 1: running no-op...")
    sess1.run(tf.no_op())
    print("server 1: no-op run!")
    server1.join() # Block

def s2():
    for i in range(3):
        print("server 2: %d seconds left before connecting..."
              % (3 - i))
        sleep(1.0)
    server2 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=1)
    print("server 2: connected!")
    server2.join() # Block

# daemon=True so that these processes will definitely be killed
# when the parent process restarts
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)

服务器1即刻加入集群,但服务器2在连接之前等待了一会儿。 结果如下所示。

p1.start()
p2.start()
——————————————————————————————————————————————————————

server 2: 3 seconds left before connecting...
server 1: running no-op...
server 2: 2 seconds left before connecting...
server 2: 1 seconds left before connecting...
server 2: connected!
server 1: no-op run!

可以看出,每个服务器都试图在集群上运行一个操作,直到所有的服务器都加入。

p1.terminate()
p2.terminate()

What happens if a server leaves the cluster?

我们用两台服务器建立一个集群。 服务器1将只是反复尝试和运行位于服务器1上的 no-op 操作。服务器2将在两秒钟后宕机。

def s1():
    server1 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=0)
    
    with tf.device("/job:local/task:0"):
        no_op = tf.no_op()
        
    sess1 = tf.Session(server1.target)
    for _ in range(6):
        print("Server 1: about to run no-op...", end="")
        sess1.run(no_op)
        print("success!")
        sleep(1.0)

def s2():
    server2 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=1)
    sleep(2.0)
    print("Server 2 dieing...")
    
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)

p1.start()
p2.start()

——————————————————————————————————————————

Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 2 dieing...
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!

在短期内,只要我们试图运行的操作不在脱离的服务器上,似乎不会出现问题。 (我没有测试过长期运行会发生什么。)
如果操作是在脱离的服务器上…

def s1():server1 = tf.train.Server(cluster,job_name="local",task_index=0)# This time, we place the no-op on server 2,# which is going to leavewith tf.device("/job:local/task:1"):no_op = tf.no_op()sess1 = tf.Session(server1.target)for _ in range(5):print("Server 1: about to run no-op...", end="")sess1.run(no_op)print("success!")sleep(1.0)p1 = Process(target=s1, daemon=True)p2 = Process(target=s2, daemon=True)p1.start()p2.start()

 

def s1():
    server1 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=0)
    
    # This time, we place the no-op on server 2,
    # which is going to leave
    with tf.device("/job:local/task:1"):
        no_op = tf.no_op()
        
    sess1 = tf.Session(server1.target)
    for _ in range(5):
        print("Server 1: about to run no-op...", end="")
        sess1.run(no_op)
        print("success!")
        sleep(1.0)
    
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)

p1.start()
p2.start()

——————————————————————————————————————————————————
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 2 dieing...

然后试图运行操作代码。

p1.terminate()
p2.terminate()

如果服务器又加入集群会怎样?

p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
p1.start()
p2.start()
sleep(3.0)
# At this point, server 1 is blocked, and server 2 is dead.
print("Restarting server 2...")
p2 = Process(target=s2, daemon=True)
p2.start()

Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 2 dieing...
Restarting server 2...
Process Process-7:
Traceback (most recent call last):
  File "/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1323, in _do_call
    return fn(*args)
  File "/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1302, in _run_fn
    status, run_metadata)
  File "/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 473, in __exit__
    c_api.TF_GetCode(self.status.status))
tensorflow.python.framework.errors_impl.AbortedError: Graph handle is not found: 0000000000000001
Server 1: about to run no-op...Server 2 dieing...

报了一个 Graph handle is not found 错误。
因此分布式TensorFlow不会自动恢复服务器故障。 (如果您对容错有兴趣,请查看TensorFlow Dev Summit video on Distributed TensorFlow

Who’s responsible for initializing shared variables?

一种方法是让所有工作机调用 tf.global_variables_initializer()
但是如果我们想保持代码整洁并且只用一个服务器进行初始化,那么如果有其他服务器在初始化之前尝试使用这些变量,可能会遇到问题。 避免这个问题的一个选择是让其他工作机等待,直到使用tf.report_uninitialized_variables进行初始化。

def s1():
    server1 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=0)
    var = tf.Variable(0.0, name='var')
    sess1 = tf.Session(server1.target)
    
    print("Server 1: waiting for connection...")
    sess1.run(tf.report_uninitialized_variables())
    while len(sess1.run(tf.report_uninitialized_variables())) > 0:
        print("Server 1: waiting for initialization...")
        sleep(1.0)
    print("Server 1: variables initialized!")

def s2():
    server2 = tf.train.Server(cluster,
                              job_name="local",
                              task_index=1)
    var = tf.Variable(0.0, name='var')
    sess2 = tf.Session(server2.target)
    
    for i in range(3):
        print("Server 2: waiting %d seconds before initializing..."
              % (3 - i))
        sleep(1.0)
    sess2.run(tf.global_variables_initializer())
    
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
p1.start()
p2.start()
————————————————————————————————————————————————————————————————

Server 1: waiting for connection...
Server 2: waiting 3 seconds before initializing...
Server 1: waiting for initialization...
Server 2: waiting 2 seconds before initializing...
Server 1: waiting for initialization...
Server 2: waiting 1 seconds before initializing...
Server 1: waiting for initialization...
Server 1: variables initialized!

 

p1.terminate()
p2.terminate()

Example

让我们把所学的知识融合到最后一个使用多进程的例子中。
我们将创建:

  • 一个存储单变量的参数服务器。
  • 两个工作任务,每个任务隔一段时间增加一下变量的值。

我们将让参数服务器多打印几次变量的值,以便我们可以真正看到它的变化。

import tensorflow as tf
from multiprocessing import Process
from time import sleep

cluster = tf.train.ClusterSpec({
    "worker": [
        "localhost:3333",
        "localhost:3334",
    ],
    "ps": [
        "localhost:3335"
    ]
})

def parameter_server():
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')

    server = tf.train.Server(cluster,
                             job_name="ps",
                             task_index=0)
    sess = tf.Session(target=server.target)
    
    print("Parameter server: waiting for cluster connection...")
    sess.run(tf.report_uninitialized_variables())
    print("Parameter server: cluster ready!")
    
    print("Parameter server: initializing variables...")
    sess.run(tf.global_variables_initializer())
    print("Parameter server: variables initialized")
    
    for i in range(5):
        val = sess.run(var)
        print("Parameter server: var has value %.1f" % val)
        sleep(1.0)

    print("Parameter server: blocking...")
    server.join()
    

def worker(worker_n):
    with tf.device("/job:ps/task:0"):
        var = tf.Variable(0.0, name='var')
        
    server = tf.train.Server(cluster,
                             job_name="worker",
                             task_index=worker_n)
    sess = tf.Session(target=server.target)
    
    print("Worker %d: waiting for cluster connection..." % worker_n)
    sess.run(tf.report_uninitialized_variables())
    print("Worker %d: cluster ready!" % worker_n)
    
    while sess.run(tf.report_uninitialized_variables()):
        print("Worker %d: waiting for variable initialization..." % worker_n)
        sleep(1.0)
    print("Worker %d: variables initialized" % worker_n)
    
    for i in range(5):
        print("Worker %d: incrementing var" % worker_n)
        sess.run(var.assign_add(1.0))
        sleep(1.0)
    
    print("Worker %d: blocking..." % worker_n)
    server.join()

ps_proc = Process(target=parameter_server, daemon=True)
w1_proc = Process(target=worker, args=(0, ), daemon=True)
w2_proc = Process(target=worker, args=(1, ), daemon=True)
ps_proc.start()
Parameter server: waiting for cluster connection...
Parameter server: cluster ready!
Parameter server: initializing variables...
Parameter server: variables initialized
Parameter server: var has value 0.0
Parameter server: var has value 2.0
Parameter server: var has value 4.0
Parameter server: var has value 5.0
Parameter server: var has value 7.0
Parameter server: blocking...
w1_proc.start()
Worker 0: waiting for cluster connection...
Worker 0: cluster ready!
Worker 0: waiting for variable initialization...
Worker 0: variables initialized
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: blocking...
w2_proc.start()
Worker 1: waiting for cluster connection...
Worker 1: cluster ready!
Worker 1: waiting for variable initialization...
Worker 1: variables initialized
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: blocking...
for proc in [w1_proc, w2_proc, ps_proc]:
    proc.terminate()

Final words

我们学到了:

  • 如何将多个TensorFlow执行引擎(运行在不同的进程或不同的机器上)连接在一起,以便共享变量。
  • 如果为变量或操作指定服务器。
  • 图内复制与图间复制。
  • 如果我们尝试在所有服务器互联之前或在服务器离开之后在集群上运行操作,会发生什么。
  • 如何等待变量被集群中的另一个任务初始化。

更多信息及实例请查阅官方文档 Distributed TensorFlow

发表评论

电子邮件地址不会被公开。 必填项已用*标注