Esta página foi traduzida pela API Cloud Translation.
Switch to English

Treinamento distribuído com o TensorFlow

Ver em TensorFlow.org Executar no Google Colab Ver fonte no GitHub Download do caderno

Visão geral

tf.distribute.Strategy é uma API do TensorFlow para distribuir o treinamento entre várias GPUs, várias máquinas ou TPUs. Usando esta API, você pode distribuir seus modelos e códigos de treinamento existentes com alterações mínimas de código.

tf.distribute.Strategy foi projetado com esses objetivos principais em mente:

  • Fácil de usar e oferecer suporte a vários segmentos de usuários, incluindo pesquisadores, engenheiros de ML, etc.
  • Forneça um bom desempenho pronto para uso.
  • Troca fácil entre estratégias.

tf.distribute.Strategy pode ser usado com uma API de alto nível, como Keras , e também pode ser usado para distribuir loops de treinamento personalizados (e, em geral, qualquer cálculo usando o TensorFlow).

No TensorFlow 2.x, você pode executar seus programas com entusiasmo ou em um gráfico usando tf.function . tf.distribute.Strategy pretende suportar esses dois modos de execução, mas funciona melhor com tf.function . O modo Ansioso é recomendado apenas para fins de depuração e não é suportado pelo TPUStrategy . Embora discutamos o treinamento na maioria das vezes neste guia, essa API também pode ser usada para distribuir avaliações e previsões em diferentes plataformas.

Você pode usar tf.distribute.Strategy com muito poucas alterações no seu código, porque alteramos os componentes subjacentes do TensorFlow para tornar-se ciente da estratégia. Isso inclui variáveis, camadas, modelos, otimizadores, métricas, resumos e pontos de verificação.

Neste guia, explicamos vários tipos de estratégias e como você pode usá-las em diferentes situações.

 # Import TensorFlow
import tensorflow as tf
 

Tipos de estratégias

tf.distribute.Strategy pretende abranger vários casos de uso em diferentes eixos. Atualmente, algumas dessas combinações são suportadas e outras serão adicionadas no futuro. Alguns desses eixos são:

  • Treinamento síncrono vs assíncrono: essas são duas maneiras comuns de distribuir o treinamento com paralelismo de dados. No treinamento de sincronização, todos os funcionários treinam sobre diferentes fatias de dados de entrada em sincronia e agregam gradientes a cada etapa. No treinamento assíncrono, todos os trabalhadores estão treinando independentemente os dados de entrada e atualizando variáveis ​​de forma assíncrona. Normalmente, o treinamento de sincronização é suportado por meio de redução total e assíncrona por meio da arquitetura do servidor de parâmetros.
  • Plataforma de hardware: você pode escalar seu treinamento em várias GPUs em uma máquina ou em várias máquinas em uma rede (com 0 ou mais GPUs cada) ou em Cloud TPUs.

Para dar suporte a esses casos de uso, existem seis estratégias disponíveis. Na próxima seção, explicamos quais deles são suportados em quais cenários no TF 2.2 no momento. Aqui está uma rápida visão geral:

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API do Keras Suportado Suportado Suporte experimental Suporte experimental Mensagem planejada suportada 2.3
Loop de treinamento personalizado Suportado Suportado Suporte experimental Suporte experimental Mensagem planejada suportada 2.3
API do estimador Suporte limitado Não suportado Suporte limitado Suporte limitado Suporte limitado

MirroredStrategy

tf.distribute.MirroredStrategy suporta treinamento distribuído síncrono em várias GPUs em uma máquina. Ele cria uma réplica por dispositivo GPU. Cada variável no modelo é espelhada em todas as réplicas. Juntas, essas variáveis ​​formam uma única variável conceitual chamada MirroredVariable . Essas variáveis ​​são mantidas em sincronia, aplicando atualizações idênticas.

Algoritmos eficientes de redução total são usados ​​para comunicar as atualizações variáveis ​​entre os dispositivos. Reduza todos os tensores agregados em todos os dispositivos, adicionando-os e disponibilizando-os em cada dispositivo. É um algoritmo fundido que é muito eficiente e pode reduzir significativamente a sobrecarga da sincronização. Existem muitos algoritmos e implementações de redução total disponíveis, dependendo do tipo de comunicação disponível entre os dispositivos. Por padrão, ele usa o NVIDIA NCCL como a implementação de redução total. Você pode escolher entre algumas outras opções que fornecemos ou escrever suas próprias.

Aqui está a maneira mais simples de criar o MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Isso criará uma instância MirroredStrategy que usará todas as GPUs visíveis para o TensorFlow e usará a NCCL como a comunicação entre dispositivos.

Se você deseja usar apenas algumas das GPUs em sua máquina, faça o seguinte:

 mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
 
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Se você deseja substituir a comunicação entre dispositivos, pode fazê-lo usando o argumento cross_device_ops , fornecendo uma instância de tf.distribute.CrossDeviceOps . Atualmente, tf.distribute.HierarchicalCopyAllReduce e tf.distribute.ReductionToOneDevice são duas opções diferentes de tf.distribute.NcclAllReduce que é o padrão.

 mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy permite executar o treinamento do TensorFlow em unidades de processamento de tensor (TPUs). As TPUs são ASICs especializadas do Google, projetadas para acelerar drasticamente as cargas de trabalho de aprendizado de máquina. Eles estão disponíveis no Google Colab, na TensorFlow Research Cloud e na Cloud TPU .

Em termos de arquitetura de treinamento distribuído, o TPUStrategy é o mesmo MirroredStrategy - implementa treinamento distribuído síncrono. As TPUs fornecem sua própria implementação de operações eficientes de redução total e outras operações coletivas em vários núcleos de TPU, que são usadas na TPUStrategy .

Aqui está como você instanciaria a TPUStrategy :

 cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
 

A instância TPUClusterResolver ajuda a localizar as TPUs. No Colab, você não precisa especificar nenhum argumento para ele.

Se você deseja usar isso para TPUs na nuvem:

  • Você deve especificar o nome do seu recurso TPU no argumento tpu .
  • Você deve inicializar o sistema tpu explicitamente no início do programa. Isso é necessário antes que as TPUs possam ser usadas para o cálculo. A inicialização do sistema tpu também apaga a memória do TPU, por isso é importante concluir esta etapa primeiro para evitar a perda de estado.

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy é muito semelhante ao MirroredStrategy . Ele implementa treinamento distribuído síncrono em vários trabalhadores, cada um com GPUs potencialmente múltiplas. Semelhante ao MirroredStrategy , ele cria cópias de todas as variáveis ​​no modelo em cada dispositivo em todos os trabalhadores.

Ele usa CollectiveOps como o método de comunicação com redução de vários trabalhadores usado para manter as variáveis ​​sincronizadas. Uma operação coletiva é uma operação única no gráfico TensorFlow que pode escolher automaticamente um algoritmo de redução total no tempo de execução do TensorFlow, de acordo com o hardware, a topologia de rede e os tamanhos dos tensores.

Também implementa otimizações de desempenho adicionais. Por exemplo, inclui uma otimização estática que converte várias reduções totais em tensores pequenos em menos reduções totais em tensores maiores. Além disso, estamos projetando-o para ter uma arquitetura de plug-in - para que, no futuro, você possa plug-in de algoritmos mais ajustados para o seu hardware. Observe que as operações coletivas também implementam outras operações coletivas, como transmissão e coleta total.

Aqui está a maneira mais simples de criar o MultiWorkerMirroredStrategy :

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

Atualmente, o MultiWorkerMirroredStrategy permite escolher entre duas implementações diferentes de operações coletivas. CollectiveCommunication.RING implementa coletivos baseados em anel usando o gRPC como camada de comunicação. CollectiveCommunication.NCCL usa o NCCL da Nvidia para implementar coletivos. CollectiveCommunication.AUTO adia a opção para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e a interconexão da rede no cluster. Você pode especificá-los da seguinte maneira:

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

Uma das principais diferenças para o treinamento de vários trabalhadores, em comparação com o treinamento de várias GPUs, é a configuração de vários trabalhadores. A variável de ambiente TF_CONFIG é a maneira padrão no TensorFlow para especificar a configuração do cluster para cada trabalhador que faz parte do cluster. Saiba mais sobre a configuração do TF_CONFIG .

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy faz treinamento síncrono. As variáveis ​​não são espelhadas; elas são colocadas na CPU e as operações são replicadas em todas as GPUs locais. Se houver apenas uma GPU, todas as variáveis ​​e operações serão colocadas nessa GPU.

Crie uma instância do CentralStorageStrategy :

 central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
 
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Isso criará uma instância CentralStorageStrategy que usará todas as GPUs e CPU visíveis. A atualização para variáveis ​​nas réplicas será agregada antes de ser aplicada às variáveis.

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy suporta o treinamento de servidores de parâmetros em várias máquinas. Nesta configuração, algumas máquinas são designadas como trabalhadores e outras como servidores de parâmetros. Cada variável do modelo é colocada em um servidor de parâmetros. A computação é replicada em todas as GPUs de todos os trabalhadores.

Em termos de código, é semelhante a outras estratégias:

 ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
 

Para o treinamento de vários trabalhadores, TF_CONFIG precisa especificar a configuração de servidores e trabalhadores de parâmetros em seu cluster, sobre os quais você pode ler mais em TF_CONFIG abaixo .

Outras estratégias

Além das estratégias acima, existem duas outras estratégias que podem ser úteis para tf.distribute protótipos e depuração ao usar APIs tf.distribute .

Estratégia Padrão

Estratégia padrão é uma estratégia de distribuição que está presente quando nenhuma estratégia de distribuição explícita está no escopo. Ele implementa a interface tf.distribute.Strategy , mas é uma passagem e não fornece distribuição real. Por exemplo, strategy.run(fn) simplesmente chamará fn . O código gravado usando essa estratégia deve se comportar exatamente como o código gravado sem nenhuma estratégia. Você pode pensar nisso como uma estratégia "não operacional".

A estratégia padrão é um singleton - e não é possível criar mais instâncias. Pode ser obtido usando tf.distribute.get_strategy() fora do escopo de qualquer estratégia explícita (a mesma API que pode ser usada para colocar a estratégia atual dentro do escopo de uma estratégia explícita).

 default_strategy = tf.distribute.get_strategy()
 

Essa estratégia serve a dois propósitos principais:

  • Ele permite escrever código de biblioteca com reconhecimento de distribuição incondicionalmente. Por exemplo, no otimizador, podemos fazer tf.distribute.get_strategy() e usar essa estratégia para reduzir gradientes - ele sempre retornará um objeto de estratégia no qual podemos chamar a API de redução.
 # In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
 
1.0
  • Semelhante ao código da biblioteca, ele pode ser usado para escrever programas de usuários finais para trabalhar com e sem estratégia de distribuição, sem exigir lógica condicional. Um trecho de código de exemplo que ilustra isso:
 if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
 
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy é uma estratégia para colocar todas as variáveis ​​e cálculos em um único dispositivo especificado.

 strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
 

Essa estratégia é diferente da estratégia padrão de várias maneiras. Na estratégia padrão, a lógica de posicionamento variável permanece inalterada quando comparada à execução do TensorFlow sem nenhuma estratégia de distribuição. Porém, ao usar o OneDeviceStrategy , todas as variáveis ​​criadas em seu escopo são explicitamente colocadas no dispositivo especificado. Além disso, todas as funções chamadas via OneDeviceStrategy.run também serão colocadas no dispositivo especificado.

A entrada distribuída por essa estratégia será pré-buscada no dispositivo especificado. Na estratégia padrão, não há distribuição de entrada.

Semelhante à estratégia padrão, essa estratégia também pode ser usada para testar seu código antes de mudar para outras estratégias que realmente são distribuídas para vários dispositivos / máquinas. Isso exercitará o mecanismo da estratégia de distribuição um pouco mais do que a estratégia padrão, mas não em toda a extensão como usar MirroredStrategy ou TPUStrategy etc. Se você deseja um código que se comporte como se não houvesse estratégia, use a estratégia padrão.

Até agora, falamos sobre quais são as diferentes estratégias disponíveis e como você pode instancia-las. Nas próximas seções, falaremos sobre as diferentes maneiras pelas quais você pode usá-las para distribuir seu treinamento. Mostraremos trechos curtos de código neste guia e vincularemos a tutoriais completos que você pode executar de ponta a ponta.

Usando tf.distribute.Strategy com tf.keras.Model.fit

tf.distribute.Strategy no tf.keras que é a implementação do TensorFlow da especificação da API Keras . tf.keras é uma API de alto nível para construir e treinar modelos. Ao integrar no tf.keras back-end, simplificamos a distribuição do treinamento escrito na estrutura de treinamento Keras usando model.fit .

Aqui está o que você precisa alterar no seu código:

  1. Crie uma instância do tf.distribute.Strategy apropriado.
  2. Mova a criação do modelo, otimizador e métricas Keras dentro de strategy.scope .

Suportamos todos os tipos de modelos Keras - seqüenciais, funcionais e subclassificados.

Aqui está um trecho de código para fazer isso em um modelo Keras muito simples com uma camada densa:

 mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Neste exemplo, usamos o MirroredStrategy para que possamos executar isso em uma máquina com várias GPUs. strategy.scope() indica para Keras qual estratégia usar para distribuir o treinamento. Criar modelos / otimizadores / métricas dentro desse escopo nos permite criar variáveis ​​distribuídas em vez de variáveis ​​regulares. Uma vez configurado, você pode ajustar seu modelo como faria normalmente. MirroredStrategy cuida de replicar o treinamento do modelo nas GPUs disponíveis, agregando gradientes e muito mais.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
 
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.0035
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.4436
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 1ms/step - loss: 0.2755

0.27546340227127075

Aqui usamos um tf.data.Dataset para fornecer o treinamento e a avaliação de avaliação. Você também pode usar matrizes numpy:

 import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
 
Epoch 1/2
10/10 [==============================] - 0s 1ms/step - loss: 0.1961
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.0867

<tensorflow.python.keras.callbacks.History at 0x7fa17808e240>

Nos dois casos (conjunto de dados ou numpy), cada lote da entrada fornecida é dividido igualmente entre as várias réplicas. Por exemplo, se você usar o MirroredStrategy com 2 GPUs, cada lote de tamanho 10 será dividido entre as 2 GPUs, cada uma recebendo 5 exemplos de entrada em cada etapa. Cada época será treinada mais rapidamente à medida que você adiciona mais GPUs. Normalmente, você deseja aumentar o tamanho do lote à medida que adiciona mais aceleradores, de modo a fazer uso efetivo do poder de computação extra. Você também precisará ajustar novamente sua taxa de aprendizado, dependendo do modelo. Você pode usar strategy.num_replicas_in_sync para obter o número de réplicas.

 # Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
 

O que é suportado agora?

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
APIs Keras Suportado Suportado Suporte experimental Suporte experimental Suporte planejado pós 2.3

Exemplos e tutoriais

Aqui está uma lista de tutoriais e exemplos que ilustram a integração acima de ponta a ponta com o Keras:

  1. Tutorial para treinar MNIST com MirroredStrategy .
  2. Tutorial para treinar MNIST usando o MultiWorkerMirroredStrategy .
  3. Guia sobre o treinamento do MNIST usando o TPUStrategy .
  4. TensorFlow Modelo Garden repositório contendo coleções de modelos state-of-the-art implementadas usando várias estratégias.

Usando tf.distribute.Strategy com loops de treinamento personalizados

Como você viu, usar tf.distribute.Strategy com Keras model.fit requer a alteração de apenas algumas linhas do seu código. Com um pouco mais de esforço, você também pode usar tf.distribute.Strategy com loops de treinamento personalizados.

Se você precisar de mais flexibilidade e controle sobre seus loops de treinamento do que é possível com o Estimator ou o Keras, poderá criar loops de treinamento personalizados. Por exemplo, ao usar um GAN, convém executar um número diferente de etapas de gerador ou discriminador a cada rodada. Da mesma forma, as estruturas de alto nível não são muito adequadas para o treinamento de Aprendizado por Reforço.

Para oferecer suporte a loops de treinamento personalizados, fornecemos um conjunto principal de métodos por meio das classes tf.distribute.Strategy . O uso desses recursos pode exigir uma pequena reestruturação do código inicialmente, mas, uma vez feito isso, você poderá alternar entre GPUs, TPUs e várias máquinas simplesmente alterando a instância da estratégia.

Aqui, mostraremos um breve trecho ilustrando esse caso de uso para um exemplo de treinamento simples, usando o mesmo modelo Keras de antes.

Primeiro, criamos o modelo e o otimizador dentro do escopo da estratégia. Isso garante que quaisquer variáveis ​​criadas com o modelo e o otimizador sejam variáveis ​​espelhadas.

 with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()
 

Em seguida, criamos o conjunto de dados de entrada e chamamos tf.distribute.Strategy.experimental_distribute_dataset para distribuir o conjunto de dados com base na estratégia.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
 

Em seguida, definimos uma etapa do treinamento. Usaremos tf.GradientTape para calcular gradientes e o otimizador para aplicá-los para atualizar as variáveis ​​do nosso modelo. Para distribuir essa etapa de treinamento, colocamos uma função train_step e a passamos para tf.distrbute.Strategy.run juntamente com as entradas do conjunto de dados que obtemos de dist_dataset criado antes:

 loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
 

Algumas outras coisas a serem observadas no código acima:

  1. Usamos tf.nn.compute_average_loss para calcular a perda. tf.nn.compute_average_loss soma a perda por exemplo e divide a soma pelo global_batch_size. Isso é importante porque, depois que os gradientes são calculados em cada réplica, eles são agregados nas réplicas, somando -as.
  2. Usamos a API tf.distribute.Strategy.reduce para agregar os resultados retornados por tf.distribute.Strategy.run . tf.distribute.Strategy.run retorna resultados de cada réplica local na estratégia e existem várias maneiras de consumir esse resultado. Você pode reduce los para obter um valor agregado. Você também pode tf.distribute.Strategy.experimental_local_results para obter a lista de valores contidos no resultado, um por réplica local.
  3. Quando apply_gradients é chamado dentro de um escopo de estratégia de distribuição, seu comportamento é modificado. Especificamente, antes de aplicar gradientes em cada instância paralela durante o treinamento síncrono, ele executa uma soma sobre todas as réplicas dos gradientes.

Por fim, depois de definir a etapa de treinamento, podemos iterar sobre dist_dataset e executar o treinamento em um loop:

 for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
 
tf.Tensor(0.4155251, shape=(), dtype=float32)
tf.Tensor(0.41321823, shape=(), dtype=float32)
tf.Tensor(0.4109319, shape=(), dtype=float32)
tf.Tensor(0.40866604, shape=(), dtype=float32)
tf.Tensor(0.40642032, shape=(), dtype=float32)
tf.Tensor(0.40419456, shape=(), dtype=float32)
tf.Tensor(0.4019885, shape=(), dtype=float32)
tf.Tensor(0.399802, shape=(), dtype=float32)
tf.Tensor(0.39763477, shape=(), dtype=float32)
tf.Tensor(0.3954866, shape=(), dtype=float32)
tf.Tensor(0.39335734, shape=(), dtype=float32)
tf.Tensor(0.3912467, shape=(), dtype=float32)
tf.Tensor(0.38915452, shape=(), dtype=float32)
tf.Tensor(0.38708064, shape=(), dtype=float32)
tf.Tensor(0.38502476, shape=(), dtype=float32)
tf.Tensor(0.38298675, shape=(), dtype=float32)
tf.Tensor(0.38096642, shape=(), dtype=float32)
tf.Tensor(0.3789635, shape=(), dtype=float32)
tf.Tensor(0.3769779, shape=(), dtype=float32)
tf.Tensor(0.37500936, shape=(), dtype=float32)

No exemplo acima, dist_dataset sobre o dist_dataset para fornecer informações ao seu treinamento. Também fornecemos o tf.distribute.Strategy.make_experimental_numpy_dataset para suportar entradas numpy. Você pode usar esta API para criar um conjunto de dados antes de chamar tf.distribute.Strategy.experimental_distribute_dataset .

Outra maneira de iterar sobre seus dados é usar explicitamente iteradores. Você pode fazer isso quando quiser executar um determinado número de etapas, em vez de iterar por todo o conjunto de dados. A iteração acima agora seria modificada para primeiro criar um iterador e, em seguida, chamar explicitamente a next para obter os dados de entrada.

 iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
 
tf.Tensor(0.37305772, shape=(), dtype=float32)
tf.Tensor(0.3711228, shape=(), dtype=float32)
tf.Tensor(0.3692044, shape=(), dtype=float32)
tf.Tensor(0.36730233, shape=(), dtype=float32)
tf.Tensor(0.3654165, shape=(), dtype=float32)
tf.Tensor(0.36354658, shape=(), dtype=float32)
tf.Tensor(0.36169255, shape=(), dtype=float32)
tf.Tensor(0.3598542, shape=(), dtype=float32)
tf.Tensor(0.35803124, shape=(), dtype=float32)
tf.Tensor(0.3562236, shape=(), dtype=float32)

Isso abrange o caso mais simples de usar a API tf.distribute.Strategy para distribuir loops de treinamento personalizados. Estamos no processo de melhorar essas APIs. Como esse caso de uso exige mais trabalho para adaptar seu código, publicaremos um guia detalhado separado no futuro.

O que é suportado agora?

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Loop de treinamento personalizado Suportado Suportado Suporte experimental Suporte experimental Suporte planejado pós 2.3

Exemplos e tutoriais

Aqui estão alguns exemplos para usar a estratégia de distribuição com loops de treinamento personalizados:

  1. Tutorial para treinar MNIST usando MirroredStrategy .
  2. Guia sobre o treinamento do MNIST usando o TPUStrategy .
  3. TensorFlow Modelo Garden repositório contendo coleções de modelos state-of-the-art implementadas usando várias estratégias.

Usando tf.distribute.Strategy com Estimator (suporte limitado)

tf.estimator é uma API TensorFlow de treinamento distribuído que originalmente suportava a abordagem do servidor de parâmetros assíncronos. Assim como Keras, integramos o tf.distribute.Strategy no tf.Estimator . Se você estiver usando o Estimator para seu treinamento, poderá mudar facilmente para treinamento distribuído com muito poucas alterações no seu código. Com isso, os usuários do Estimator agora podem fazer treinamento distribuído síncrono em várias GPUs e vários trabalhadores, além de usar TPUs. Esse suporte no Estimator é, no entanto, limitado. Consulte a seção O que é suportado agora abaixo para obter mais detalhes.

O uso do tf.distribute.Strategy with Estimator é um pouco diferente do caso Keras. Em vez de usar strategy.scope , agora passamos o objeto de estratégia para o RunConfig do Estimator.

Aqui está um trecho de código que mostra isso com um LinearRegressor e MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmp2ack9oru
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmp2ack9oru', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

Usamos um estimador pré-fabricado aqui, mas o mesmo código também funciona com um estimador personalizado. train_distribute determina como o treinamento será distribuído e eval_distribute determina como a avaliação será distribuída. Essa é outra diferença de Keras, onde usamos a mesma estratégia para treinamento e avaliação.

Agora podemos treinar e avaliar este estimador com uma função de entrada:

 def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
 
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-08-04T20:28:12Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmp2ack9oru/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.20350s
INFO:tensorflow:Finished evaluation at 2020-08-04-20:28:12
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmp2ack9oru/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Outra diferença a destacar aqui entre o Estimator e o Keras é a manipulação de entrada. No Keras, mencionamos que cada lote do conjunto de dados é dividido automaticamente entre várias réplicas. No Estimator, no entanto, não fazemos divisão automática de lote, nem fragmentamos automaticamente os dados entre diferentes trabalhadores. Você tem controle total sobre como deseja que seus dados sejam distribuídos entre trabalhadores e dispositivos e deve fornecer um input_fn para especificar como distribuir seus dados.

Seu input_fn é chamado uma vez por trabalhador, fornecendo um conjunto de dados por trabalhador. Em seguida, um lote desse conjunto de dados é alimentado em uma réplica nesse trabalhador, consumindo N lotes para N réplicas em um trabalhador. Em outras palavras, o conjunto de dados retornado pelo input_fn deve fornecer lotes de tamanho PER_REPLICA_BATCH_SIZE . E o tamanho global do lote para uma etapa pode ser obtido como PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync .

Ao fazer um treinamento para vários trabalhadores, você deve dividir seus dados entre os trabalhadores ou embaralhar com uma semente aleatória em cada um. Você pode ver um exemplo de como fazer isso no Treinamento para trabalhadores múltiplos com estimador .

Da mesma forma, você também pode usar estratégias de vários trabalhadores e servidores de parâmetros. O código permanece o mesmo, mas você precisa usar tf.estimator.train_and_evaluate e definir variáveis ​​de ambiente TF_CONFIG para cada binário em execução no seu cluster.

O que é suportado agora?

Há suporte limitado ao treinamento com o Estimator usando todas as estratégias, exceto a estratégia TPUStrategy . O treinamento e a avaliação básicos devem funcionar, mas vários recursos avançados, como o andaime, ainda não funcionam. Também pode haver vários erros nessa integração. No momento, não planejamos melhorar ativamente esse suporte. Em vez disso, estamos focados no Keras e no suporte ao loop de treinamento personalizado. Se possível, você deve preferir usar tf.distribute com essas APIs.

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API do estimador Suporte limitado Não suportado Suporte limitado Suporte limitado Suporte limitado

Exemplos e tutoriais

Aqui estão alguns exemplos que mostram o uso completo de várias estratégias com o Estimator:

  1. Treinamento para vários trabalhadores com o Estimator para treinar MNIST com vários trabalhadores usando o MultiWorkerMirroredStrategy .
  2. Exemplo de ponta a ponta para treinamento de vários trabalhadores no fluxo de tensor / ecossistema usando modelos Kubernetes. Este exemplo começa com um modelo Keras e o converte em um tf.keras.estimator.model_to_estimator usando a API tf.keras.estimator.model_to_estimator .
  3. Modelo oficial ResNet50 , que pode ser treinado usando o MirroredStrategy ou o MultiWorkerMirroredStrategy .

Outros tópicos

Nesta seção, abordaremos alguns tópicos relevantes para vários casos de uso.

Configurando a variável de ambiente TF_CONFIG

Para treinamento para vários trabalhadores, como mencionado anteriormente, você precisa definir a variável de ambiente TF_CONFIG para cada binário em execução no seu cluster. A variável de ambiente TF_CONFIG é uma sequência JSON que especifica quais tarefas constituem um cluster, seus endereços e a função de cada tarefa no cluster. Nós fornecemos um modelo Kubernetes no repositório tensorflow / ecossistema que define TF_CONFIG para suas tarefas de treinamento.

Existem dois componentes do TF_CONFIG: cluster e tarefa. cluster fornece informações sobre o cluster de treinamento, que é um ditado que consiste em diferentes tipos de tarefas, como worker. No treinamento para vários trabalhadores, geralmente há um trabalhador que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever o arquivo de resumo do TensorBoard, além do que um trabalhador comum faz. Esse trabalhador é chamado de trabalhador 'chefe' e é habitual que o trabalhador com índice 0 seja nomeado como trabalhador principal (na verdade, é assim que tf.distribute.Strategy é implementado). A tarefa, por outro lado, fornece informações sobre a tarefa atual. O primeiro cluster de componentes é o mesmo para todos os trabalhadores, e a tarefa do segundo componente é diferente em cada trabalhador e especifica o tipo e o índice desse trabalhador.

Um exemplo de TF_CONFIG é:

 os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})
 

Este TF_CONFIG especifica que há três tarefas de trabalho e duas de ps no cluster, juntamente com seus hosts e portas. A parte "tarefa" especifica que a função da tarefa atual no cluster, trabalhador 1 (o segundo trabalhador). As funções válidas em um cluster são "chefe", "trabalhador", "ps" e "avaliador". Não deve haver trabalho "ps", exceto ao usar tf.distribute.experimental.ParameterServerStrategy .

Qual é o próximo?

tf.distribute.Strategy está ativamente em desenvolvimento. Convidamos você a experimentá-lo e fornecer seus comentários usando os problemas do GitHub .