Treinamento distribuído com TensorFlow

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

tf.distribute.Strategy é uma API TensorFlow para distribuir formação em várias GPUs, várias máquinas, ou TPU. Usando esta API, você pode distribuir seus modelos existentes e código de treinamento com mudanças mínimas de código.

tf.distribute.Strategy foi concebido com esses objetivos-chave em mente:

  • Fácil de usar e oferecer suporte a vários segmentos de usuários, incluindo pesquisadores, engenheiros de aprendizado de máquina, etc.
  • Fornece bom desempenho fora da caixa.
  • Troca fácil entre estratégias.

tf.distribute.Strategy pode ser usado com um API de alto nível como Keras , e também pode ser usado para distribuir laços de treinamento personalizado (e, em geral, qualquer computação usando TensorFlow).

Em TensorFlow 2.x, você pode executar seus programas ansiosamente, ou em um gráfico usando tf.function . tf.distribute.Strategy pretende apoiar ambos os modos de execução, mas funciona melhor com tf.function . Modo ansioso só é recomendado para fins de depuração e não suportados para tf.distribute.TPUStrategy . Embora o treinamento seja o foco deste guia, esta API também pode ser usada para distribuir avaliação e predição em diferentes plataformas.

Você pode usar tf.distribute.Strategy com muito poucas alterações em seu código, porque os componentes subjacentes do TensorFlow foram alterados para tornar a estratégia-aware. Isso inclui variáveis, camadas, modelos, otimizadores, métricas, resumos e pontos de verificação.

Neste guia, você aprenderá sobre vários tipos de estratégias e como pode usá-las em diferentes situações. Para saber como depurar problemas de desempenho, consulte o desempenho Optimize TensorFlow GPU guia.

# Import TensorFlow
import tensorflow as tf

Tipos de estratégias

tf.distribute.Strategy pretende abranger uma série de casos de uso ao longo de diferentes eixos. Algumas dessas combinações são atualmente suportadas e outras serão adicionadas no futuro. Alguns desses eixos são:

  • Synchronous contra o treinamento assíncrono: Estas são duas formas comuns de distribuição de treinamento com paralelismo de dados. No treinamento de sincronização, todos os funcionários treinam em diferentes fatias de dados de entrada em sincronia e agregam gradientes em cada etapa. No treinamento assíncrono, todos os trabalhadores treinam independentemente sobre os dados de entrada e atualizam as variáveis ​​de forma assíncrona. Normalmente, o treinamento de sincronização é suportado por meio de redução total e assíncrono por meio de arquitetura de servidor de parâmetro.
  • Plataforma de hardware: Você pode querer expandir a sua formação em múltiplas GPUs em uma máquina, ou várias máquinas em uma rede (com 0 ou mais GPUs cada), ou na nuvem TPUs.

Para oferecer suporte a esses casos de uso, existem 6 estratégias disponíveis. A próxima seção explica quais deles são compatíveis em quais cenários no TensorFlow. Aqui está uma visão geral rápida:

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API Keras Suportado Suportado Suportado Suporte Experimental Suporte Experimental
Loop de treinamento personalizado Suportado Suportado Suportado Suporte Experimental Suporte Experimental
API Estimator Suporte Limitado Não suportado Suporte Limitado Suporte Limitado Suporte Limitado

MirroredStrategy

tf.distribute.MirroredStrategy suportes síncronos distribuídos formação em várias GPUs em uma máquina. Ele cria uma réplica por dispositivo GPU. Cada variável no modelo é espelhada em todas as réplicas. Juntas, essas variáveis formar uma única variável conceitual chamado MirroredVariable . Essas variáveis ​​são mantidas em sincronia entre si aplicando atualizações idênticas.

Algoritmos eficientes de redução total são usados ​​para comunicar as atualizações de variáveis ​​entre os dispositivos. A redução total agrega tensores em todos os dispositivos, adicionando-os e tornando-os disponíveis em cada dispositivo. É um algoritmo fundido muito eficiente e pode reduzir significativamente a sobrecarga de sincronização. Existem muitos algoritmos de redução total e implementações disponíveis, dependendo do tipo de comunicação disponível entre os dispositivos. Por padrão, ele usa a Biblioteca Comunicação Collective NVIDIA ( NCCL ) como a implementação de todos os reduzir. Você pode escolher entre algumas outras opções ou escrever sua própria.

Aqui é a maneira mais simples de criar MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
2021-08-21 01:24:44.677825: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.686081: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.687041: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.689423: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-21 01:24:44.690022: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.690987: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:44.691896: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.284404: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.285446: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.286341: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-21 01:24:45.287150: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 14648 MB memory:  -> device: 0, name: Tesla V100-SXM2-16GB, pci bus id: 0000:00:05.0, compute capability: 7.0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Isto irá criar um MirroredStrategy exemplo, o qual irá utilizar todas as GPUs que são visíveis para TensorFlow, e NCCL-como a comunicação entre dispositivos.

Se desejar usar apenas algumas das GPUs em sua máquina, você pode fazer assim:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Se você deseja substituir o dispositivo de comunicação cruz, você pode fazer isso usando o cross_device_ops argumento através do fornecimento de uma instância de tf.distribute.CrossDeviceOps . Atualmente, tf.distribute.HierarchicalCopyAllReduce e tf.distribute.ReductionToOneDevice são outros do que duas opções tf.distribute.NcclAllReduce , que é o padrão.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy permite que você execute o seu treinamento TensorFlow em Tensor Processing Units (TPU). TPUs são ASICs especializados do Google, projetados para acelerar drasticamente as cargas de trabalho de aprendizado de máquina. Eles estão disponíveis no Google Colab , o TPU Research Nuvem e Nuvem TPU .

Em termos de arquitetura formação distribuído, TPUStrategy é o mesmo MirroredStrategy -é implementos síncronos formação distribuída. TPUs fornecer sua própria implementação de eficientes tudo reduzir e outras operações coletivas em vários núcleos de TPU, que são usados em TPUStrategy .

Aqui está como você iria instanciar TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

O TPUClusterResolver exemplo ajuda a localizar o TPUs. No Colab, você não precisa especificar nenhum argumento para ele.

Se você quiser usar isso para Cloud TPUs:

  • Você deve especificar o nome do seu recurso TPU na tpu argumento.
  • Você deve inicializar o sistema TPU explicitamente no início do programa. Isso é necessário para que as TPUs possam ser usadas para computação. A inicialização do sistema tpu também apaga a memória da TPU, por isso é importante concluir esta etapa primeiro para evitar a perda de estado.

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy é muito semelhante ao MirroredStrategy . Ele implementa treinamento distribuído síncrono entre vários funcionários, cada um com potencialmente várias GPUs. Semelhante ao tf.distribute.MirroredStrategy , cria cópias de todas as variáveis no modelo em cada dispositivo em todos os trabalhadores.

Aqui é a maneira mais simples de criar MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy tem duas implementações para comunicações entre dispositivos. CommunicationImplementation.RING é RPC baseados e suporta tanto CPUs e GPUs. CommunicationImplementation.NCCL usa NCCL e fornece desempenho estado-da-arte em GPUs, mas ele não suporta CPUs. CollectiveCommunication.AUTO adia a escolha para Tensorflow. Você pode especificá-los da seguinte maneira:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Uma das principais diferenças para iniciar o treinamento de vários trabalhadores, em comparação com o treinamento de multi-GPU, é a configuração de vários trabalhadores. O TF_CONFIG variável ambiente é a maneira padrão em TensorFlow para especificar a configuração de cluster a cada trabalhador que faz parte do aglomerado. Saiba mais sobre a criação de TF_CONFIG .

ParameterServerStrategy

O treinamento do servidor de parâmetros é um método comum de dados paralelos para dimensionar o treinamento do modelo em várias máquinas. Um cluster de treinamento de servidor de parâmetro consiste em trabalhadores e servidores de parâmetro. As variáveis ​​são criadas em servidores de parâmetros e são lidas e atualizadas pelos trabalhadores em cada etapa. Confira a formação servidor Parâmetro tutorial para mais detalhes.

Em TensorFlow 2, formação servidor parâmetro usa uma arquitetura central baseado coordenador via tf.distribute.experimental.coordinator.ClusterCoordinator classe.

Nesta implementação, os worker e parameter server tarefas executar tf.distribute.Server s que escuta para as tarefas do coordenador. O coordenador cria recursos, despacha tarefas de treinamento, escreve pontos de verificação e lida com falhas de tarefas.

Na programação em execução no coordenador, você vai usar um ParameterServerStrategy objeto para definir uma etapa de treinamento e usar um ClusterCoordinator para etapas de treinamento expedição para trabalhadores remotos. Esta é a maneira mais simples de criá-los:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

Em TensorFlow 1, ParameterServerStrategy está disponível apenas com um estimador via tf.compat.v1.distribute.experimental.ParameterServerStrategy símbolo.

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy faz formação síncrona também. As variáveis ​​não são espelhadas; em vez disso, são colocadas na CPU e as operações são replicadas em todas as GPUs locais. Se houver apenas uma GPU, todas as variáveis ​​e operações serão colocadas nessa GPU.

Criar uma instância de CentralStorageStrategy por:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Isto irá criar uma CentralStorageStrategy exemplo que usará todas as GPUs visíveis e CPU. A atualização das variáveis ​​nas réplicas será agregada antes de ser aplicada às variáveis.

Outras estratégias

Além das estratégias acima, existem duas outras estratégias que podem ser úteis para o protótipo e depuração quando usando tf.distribute APIs.

Estratégia Padrão

A Estratégia Padrão é uma estratégia de distribuição que está presente quando nenhuma estratégia de distribuição explícita está no escopo. Ele implementa o tf.distribute.Strategy interface, mas é um repasse e não fornece nenhuma distribuição real. Por exemplo, strategy.run(fn) irá simplesmente chamar fn . O código escrito usando essa estratégia deve se comportar exatamente como o código escrito sem nenhuma estratégia. Você pode pensar nisso como uma estratégia "sem operação".

A estratégia padrão é um singleton - e não se pode criar mais instâncias dela. Ele pode ser obtido usando tf.distribute.get_strategy fora do âmbito de qualquer estratégia explícita (a mesma API que pode ser usado para obter a estratégia atual dentro âmbito de uma estratégia explícita).

default_strategy = tf.distribute.get_strategy()

Essa estratégia serve a dois propósitos principais:

  • Ele permite escrever código de biblioteca com reconhecimento de distribuição incondicionalmente. Por exemplo, em tf.optimizer s você pode usar tf.distribute.get_strategy e usar essa estratégia para reduzir gradientes-lo sempre retornará um objeto estratégia em que você pode chamar o Strategy.reduce API.
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Semelhante ao código de biblioteca, ele pode ser usado para escrever programas de usuários finais para trabalhar com e sem estratégia de distribuição, sem exigir lógica condicional. Aqui está um exemplo de snippet de código que ilustra isso:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategy é uma estratégia para colocar todas as variáveis e computação em um único dispositivo especificado.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Essa estratégia é diferente da estratégia padrão de várias maneiras. Na estratégia padrão, a lógica de colocação de variável permanece inalterada quando comparada à execução do TensorFlow sem qualquer estratégia de distribuição. Mas ao usar OneDeviceStrategy , todas as variáveis criadas no seu âmbito são explicitamente colocado no dispositivo especificado. Além disso, quaisquer funções chamadas através OneDeviceStrategy.run também irá ser colocado sobre o dispositivo especificado.

A entrada distribuída por meio dessa estratégia será pré-buscada para o dispositivo especificado. Na estratégia padrão, não há distribuição de entrada.

Semelhante à estratégia padrão, esta estratégia também pode ser usada para testar seu código antes de mudar para outras estratégias que realmente são distribuídas para vários dispositivos / máquinas. Isto irá exercer a maquinaria estratégia de distribuição um pouco mais do que a estratégia padrão, mas não em toda a extensão da utilização, por exemplo, MirroredStrategy ou TPUStrategy . Se você quiser um código que se comporte como se não houvesse estratégia, use a Estratégia Padrão.

Até agora, você viu as diferentes estratégias disponíveis e como pode instanciá-las. As próximas seções mostram as diferentes maneiras em que você pode usá-los para distribuir seu treinamento.

Usando tf.distribute.Strategy com tf.keras.Model.fit

tf.distribute.Strategy está integrado tf.keras , que é a implementação do da TensorFlow especificação API Keras . tf.keras é uma API de alto nível para modelos de construção e de trem. Ao integrar em tf.keras backend, é perfeita para você para distribuir seu treinamento escrito no quadro de formação Keras usando Model.fit .

Aqui está o que você precisa alterar em seu código:

  1. Criar uma instância da apropriado tf.distribute.Strategy .
  2. Mova a criação de Keras modelo, otimizador e métricas dentro strategy.scope .

As estratégias de distribuição do TensorFlow oferecem suporte a todos os tipos de modelos Keras: sequencial, funcional e subclasse.

Aqui está um trecho de código para fazer isso para um modelo Keras muito simples com uma camada densa:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Este exemplo usa MirroredStrategy , para que possa executar este em uma máquina com múltiplas GPUs. strategy.scope() indica a Keras qual estratégia usar para distribuir o treinamento. A criação de modelos / otimizadores / métricas dentro deste escopo permite que você crie variáveis ​​distribuídas em vez de variáveis ​​regulares. Assim que estiver configurado, você pode ajustar seu modelo como faria normalmente. MirroredStrategy cuida de replicar o treinamento do modelo nas GPUs disponíveis, agregando gradientes, e muito mais.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-08-21 01:24:46.237677: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-08-21 01:24:46.271153: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 0.0086
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.0038
2021-08-21 01:24:49.147347: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.0024
0.002372059039771557

Aqui, um tf.data.Dataset fornece o treinamento ea entrada eval. Você também pode usar matrizes NumPy:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.0017
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 7.4622e-04
2021-08-21 01:24:50.486957: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
<keras.callbacks.History at 0x7f12401ede10>

Em ambos os casos com- Dataset ou Numpy-cada lote da entrada dada é dividida igualmente entre as várias réplicas. Por exemplo, se você estiver usando o MirroredStrategy com 2 GPUs, cada lote de tamanho 10 vai ficar dividido entre os 2 GPUs, com cada um recebendo 5 exemplos de entrada em cada etapa. Cada época será treinada mais rápido conforme você adiciona mais GPUs. Normalmente, você deseja aumentar o tamanho do lote à medida que adiciona mais aceleradores, para fazer uso eficaz do poder de computação extra. Você também precisará reajustar sua taxa de aprendizado, dependendo do modelo. Você pode usar strategy.num_replicas_in_sync para obter o número de réplicas.

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

O que é compatível agora?

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
APIs Keras Suportado Suportado Suportado Suporte Experimental Suporte Experimental

Exemplos e tutoriais

Aqui está uma lista de tutoriais e exemplos que ilustram a integração acima de ponta a ponta com Keras:

  1. Tutorial para treinar MNIST com MirroredStrategy .
  2. Tutorial para treinar MNIST usando MultiWorkerMirroredStrategy .
  3. Guia sobre MNIST treinamento usando TPUStrategy .
  4. Tutorial para o treinamento servidor parâmetro no TensorFlow 2 com ParameterServerStrategy .
  5. TensorFlow Modelo Garden repositório contendo coleções de modelos state-of-the-art implementadas usando várias estratégias.

Usando tf.distribute.Strategy com loops de treinamento personalizado

Como você viu, usando tf.distribute.Strategy com Keras model.fit requer mudando apenas algumas linhas de código. Com um pouco de esforço mais, você também pode usar tf.distribute.Strategy com loops de treinamento personalizado.

Se você precisa de mais flexibilidade e controle sobre seus loops de treinamento do que é possível com o Estimator ou Keras, você pode escrever loops de treinamento personalizados. Por exemplo, ao usar um GAN, você pode querer realizar um número diferente de etapas de gerador ou discriminador a cada rodada. Da mesma forma, as estruturas de alto nível não são muito adequadas para o treinamento de Aprendizado por Reforço.

Os tf.distribute.Strategy classes fornecem um conjunto de métodos para loops de treinamento personalizado de apoio. O uso deles pode exigir uma pequena reestruturação do código inicialmente, mas depois que isso for feito, você poderá alternar entre GPUs, TPUs e várias máquinas simplesmente alterando a instância da estratégia.

Aqui, você verá um breve trecho ilustrando este caso de uso para um exemplo de treinamento simples usando o mesmo modelo Keras de antes.

Primeiro, crie o modelo e o otimizador dentro do escopo da estratégia. Isso garante que quaisquer variáveis ​​criadas com o modelo e otimizador sejam variáveis ​​espelhadas.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

Em seguida, crie o conjunto de dados de entrada e chamar tf.distribute.Strategy.experimental_distribute_dataset para distribuir o conjunto de dados com base na estratégia.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-08-21 01:24:50.715370: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Em seguida, defina uma etapa do treinamento. Use tf.GradientTape a gradientes de computação e otimizador para aplicar esses gradientes para atualizar as variáveis do seu modelo. Para distribuir esta etapa de treinamento, colocá-lo em uma função train_step e passá-lo para tf.distrbute.Strategy.run juntamente com as entradas do conjunto de dados que você tem do dist_dataset criado antes:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Algumas outras coisas a serem observadas no código acima:

  1. Você usou tf.nn.compute_average_loss para calcular a perda. tf.nn.compute_average_loss resume a perda por exemplo e divide a soma pelo global_batch_size. Isto é importante porque mais tarde, após os gradientes são calculados sobre cada réplica, eles são agregados em todo o réplicas pela soma deles.
  2. Você também usou o tf.distribute.Strategy.reduce API para agregar os resultados retornados pelo tf.distribute.Strategy.run . tf.distribute.Strategy.run retorna resultados de cada réplica local na estratégia, e há várias maneiras de consumir este resultado. Você pode reduce -los para obter um valor agregado. Você também pode fazer tf.distribute.Strategy.experimental_local_results para obter a lista de valores contidos no resultado, um por réplica local.
  3. Quando você chama apply_gradients dentro de um escopo estratégia de distribuição, o seu comportamento é modificado. Especificamente, antes de aplicar gradientes em cada instância paralela durante o treinamento síncrono, ele executa uma soma de todas as réplicas dos gradientes.

Finalmente, depois de ter definido a etapa de treinamento, você pode iterar sobre dist_dataset e executar o treinamento em um loop:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(1.3084135, shape=(), dtype=float32)
tf.Tensor(1.2977839, shape=(), dtype=float32)
tf.Tensor(1.2872384, shape=(), dtype=float32)
tf.Tensor(1.2767767, shape=(), dtype=float32)
tf.Tensor(1.2663989, shape=(), dtype=float32)
tf.Tensor(1.256105, shape=(), dtype=float32)
tf.Tensor(1.2458944, shape=(), dtype=float32)
tf.Tensor(1.2357674, shape=(), dtype=float32)
tf.Tensor(1.2257235, shape=(), dtype=float32)
tf.Tensor(1.2157627, shape=(), dtype=float32)
tf.Tensor(1.2058848, shape=(), dtype=float32)
tf.Tensor(1.1960893, shape=(), dtype=float32)
tf.Tensor(1.1863762, shape=(), dtype=float32)
tf.Tensor(1.1767453, shape=(), dtype=float32)
tf.Tensor(1.1671963, shape=(), dtype=float32)
tf.Tensor(1.1577287, shape=(), dtype=float32)
tf.Tensor(1.1483426, shape=(), dtype=float32)
tf.Tensor(1.1390375, shape=(), dtype=float32)
tf.Tensor(1.1298131, shape=(), dtype=float32)
tf.Tensor(1.1206692, shape=(), dtype=float32)

No exemplo acima, você iterada o dist_dataset a contribuir para a sua formação. Está também equipado com o tf.distribute.Strategy.make_experimental_numpy_dataset para suportar entradas Numpy. Você pode usar essa API para criar um conjunto de dados antes de chamar tf.distribute.Strategy.experimental_distribute_dataset .

Outra maneira de iterar seus dados é usar explicitamente iteradores. Você pode querer fazer isso quando quiser executar um determinado número de etapas, em vez de iterar por todo o conjunto de dados. O acima iteração seria agora ser modificado para primeiro criar um iterador e, em seguida, chamar explicitamente next sobre ele para obter os dados introduzidos.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(1.1116053, shape=(), dtype=float32)
tf.Tensor(1.1026212, shape=(), dtype=float32)
tf.Tensor(1.0937165, shape=(), dtype=float32)
tf.Tensor(1.0848908, shape=(), dtype=float32)
tf.Tensor(1.0761441, shape=(), dtype=float32)
tf.Tensor(1.0674756, shape=(), dtype=float32)
tf.Tensor(1.0588851, shape=(), dtype=float32)
tf.Tensor(1.0503721, shape=(), dtype=float32)
tf.Tensor(1.0419363, shape=(), dtype=float32)
tf.Tensor(1.0335773, shape=(), dtype=float32)

Isto cobre o caso mais simples de usar tf.distribute.Strategy API para distribuir laços de treinamento personalizados.

O que é compatível agora?

API de treinamento MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Loop de treinamento personalizado Suportado Suportado Suportado Suporte Experimental Suporte Experimental

Exemplos e tutoriais

Aqui estão alguns exemplos de uso de estratégia de distribuição com loops de treinamento personalizados:

  1. Tutorial para treinar MNIST usando MirroredStrategy .
  2. Guia sobre MNIST treinamento usando TPUStrategy .
  3. TensorFlow Modelo Garden repositório contendo coleções de modelos state-of-the-art implementadas usando várias estratégias.

Outros tópicos

Esta seção cobre alguns tópicos relevantes para vários casos de uso.

Configurando a variável de ambiente TF_CONFIG

Para o treinamento multi-trabalhador, como mencionado antes, você precisa configurar o TF_CONFIG variável de ambiente para cada binário executado em seu cluster. O TF_CONFIG variável de ambiente é uma string JSON que especifica quais as tarefas que constituem um cluster, seus endereços e o papel de cada tarefa no cluster. O tensorflow/ecosystem repo fornece um modelo Kubernetes, que estabelece TF_CONFIG para as suas tarefas de treinamento.

Há dois componentes de TF_CONFIG : um cluster e uma tarefa.

  • Um cluster fornece informações sobre o cluster de treinamento, que é um dicionário que consiste em diferentes tipos de trabalhos, como trabalhadores. No treinamento de vários trabalhadores, geralmente há um trabalhador que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever um arquivo de resumo para o TensorBoard, além do que um trabalhador regular faz. Este trabalhador é referido como o trabalhador "chefe", e é habitual que o trabalhador com o índice 0 é apontado como o chefe do trabalhador (na verdade, esta é a forma como tf.distribute.Strategy é implementado).
  • Por outro lado, uma tarefa fornece informações sobre a tarefa atual. O primeiro cluster de componente é o mesmo para todos os trabalhadores e a segunda tarefa do componente é diferente em cada trabalhador e especifica o tipo e o índice desse trabalhador.

Um exemplo de TF_CONFIG é:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Este TF_CONFIG especifica que há três trabalhadores e dois "ps" tarefas no "cluster" , juntamente com os seus anfitriões e portos. A "task" parte especifica do papel da tarefa actual no "cluster" -worker 1 (segundo o trabalhador). Papéis válidos em um cluster são "chief" , "worker" , "ps" e "evaluator" . Não deve haver "ps" trabalho exceto quando usando tf.distribute.experimental.ParameterServerStrategy .

Qual é o próximo?

tf.distribute.Strategy está ativamente em desenvolvimento. Experimente e fornecer seu feedback usando questões GitHub .