Treinamento multi-trabalhador com Estimator

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial demonstra como tf.distribute.Strategy pode ser usado para treinamento distribuído de vários trabalhadores com tf.estimator . Se você escreve seu código usando tf.estimator e está interessado em escalar além de uma única máquina com alto desempenho, este tutorial é para você.

Antes de começar, leia o guia de estratégia de distribuição . O tutorial de treinamento multi-GPU também é relevante, porque este tutorial usa o mesmo modelo.

Configurar

Primeiro, configure o TensorFlow e as importações necessárias.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Função de entrada

Este tutorial usa o conjunto de dados MNIST dos conjuntos de dados TensorFlow . O código aqui é semelhante ao tutorial de treinamento multi-GPU, com uma diferença fundamental: ao usar o Estimator para treinamento multiusuário, é necessário fragmentar o conjunto de dados pelo número de trabalhadores para garantir a convergência do modelo. Os dados de entrada são fragmentados pelo índice do trabalhador, de modo que cada trabalhador processe 1/num_workers porções distintas do conjunto de dados.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Outra abordagem razoável para alcançar a convergência seria embaralhar o conjunto de dados com sementes distintas em cada trabalhador.

Configuração de vários trabalhadores

Uma das principais diferenças neste tutorial (em comparação com o tutorial de treinamento multi-GPU ) é a configuração de vários trabalhadores. A variável de ambiente TF_CONFIG é a maneira padrão de especificar a configuração do cluster para cada trabalhador que faz parte do cluster.

Existem dois componentes de TF_CONFIG : cluster e task . cluster fornece informações sobre todo o cluster, ou seja, os trabalhadores e os servidores de parâmetros no cluster. task fornece informações sobre a tarefa atual. O primeiro cluster componente é o mesmo para todos os trabalhadores e servidores de parâmetros no cluster, e a segunda task componente é diferente em cada trabalhador e servidor de parâmetros e especifica seu próprio type e index . Neste exemplo, o type tarefa é worker e o index tarefa é 0 .

Para fins de ilustração, este tutorial mostra como definir um TF_CONFIG com 2 trabalhadores no localhost . Na prática, você criaria vários trabalhos em um endereço IP externo e porta e TF_CONFIG em cada trabalho de forma adequada, ou seja, modificaria o index da tarefa.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Defina o modelo

Escreva as camadas, o otimizador e a função de perda para o treinamento. Este tutorial define o modelo com camadas Keras, semelhante ao tutorial de treinamento multi-GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Para treinar o modelo, use uma instância de tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy cria cópias de todas as variáveis ​​nas camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps , um TensorFlow op para comunicação coletiva, para agregar gradientes e manter as variáveis ​​em sincronia. O guia tf.distribute.Strategy contém mais detalhes sobre essa estratégia.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From <ipython-input-1-f1f424df316e>:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Treine e avalie o modelo

Em seguida, especifique a estratégia de distribuição em RunConfig para o estimador e treine e avalie invocando tf.estimator.train_and_evaluate . Este tutorial distribui apenas o treinamento especificando a estratégia via train_distribute . Também é possível distribuir a avaliação via eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7eff5c6afe50>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.141045.
INFO:tensorflow:Loss for final step: 1.141045.
({'loss': 2.2780812, 'global_step': 938}, [])

Otimize o desempenho do treinamento

Agora você tem um modelo e um Estimator com capacidade para vários trabalhadores, desenvolvido por tf.distribute.Strategy . Você pode tentar as seguintes técnicas para otimizar o desempenho do treinamento de vários trabalhadores:

  • Aumentar o tamanho do lote: o tamanho do lote especificado aqui é por GPU. Em geral, o maior tamanho de lote que cabe na memória da GPU é aconselhável.
  • Variáveis elenco: lançar as variáveis a tf.float se possível. O modelo oficial do ResNet inclui um exemplo de como isso pode ser feito.
  • Use a comunicação coletiva: MultiWorkerMirroredStrategy fornece várias implementações de comunicação coletiva .

    • RING implementa coletivos baseados em anel usando gRPC como a camada de comunicação entre hosts.
    • NCCL usa a NCCL da Nvidia para implementar coletivos.
    • AUTO transfere a escolha para o tempo de execução.

    A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster. Para substituir a escolha automática, especifique um valor válido para o parâmetro de communication do MultiWorkerMirroredStrategy , por exemplo, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Visite a seção Desempenho do guia para saber mais sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho de seus modelos do TensorFlow.

Outros exemplos de código

  1. Exemplo de ponta a ponta para treinamento de vários trabalhadores em tensorflow / ecossistema usando modelos Kubernetes. Este exemplo começa com um modelo Keras e o converte em um Estimator usando a API tf.keras.estimator.model_to_estimator .
  2. Modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.