O Dia da Comunidade de ML é dia 9 de novembro! Junte-nos para atualização de TensorFlow, JAX, e mais Saiba mais

Treinamento multi-trabalhador com Estimator

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial demonstra como tf.distribute.Strategy pode ser utilizado para a formação multi-trabalhador distribuído com tf.estimator . Se você escrever seu código usando tf.estimator , e você estiver interessado em escala além de uma única máquina com alto desempenho, este tutorial é para você.

Antes de começar, leia a estratégia de distribuição guia. O tutorial de treinamento multi-GPU também é relevante, porque este tutorial usa o mesmo modelo.

Configurar

Primeiro, configure o TensorFlow e as importações necessárias.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Função de entrada

Este tutorial usa o conjunto de dados MNIST de TensorFlow conjuntos de dados . O código aqui é semelhante ao tutorial de treinamento multi-GPU com diferença uma chave: quando se utiliza Estimador para a formação multi-trabalhador, é necessário fragmentar o conjunto de dados pelo número de trabalhadores para assegurar a convergência do modelo. O dado de entrada é Sharded pelo índice de trabalhador, de modo que cada trabalhador processa 1/num_workers porções distintas do conjunto de dados.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Outra abordagem razoável para alcançar a convergência seria embaralhar o conjunto de dados com sementes distintas em cada trabalhador.

Configuração de vários trabalhadores

Uma das principais diferenças neste tutorial (em comparação com o tutorial de treinamento multi-GPU ) é a configuração multi-trabalhador. O TF_CONFIG variável ambiente é a maneira padrão de especificar a configuração de cluster a cada trabalhador que faz parte do aglomerado.

Há dois componentes de TF_CONFIG : cluster e task . cluster fornece informações sobre todo o cluster, ou seja, os trabalhadores e servidores de parâmetro no cluster. task fornece informações sobre a tarefa atual. O primeiro componente cluster é o mesmo para todos os trabalhadores e servidores de parâmetros do cluster, e o segundo componente task é diferente em cada servidor do trabalhador e parâmetro e especifica o seu próprio type e index . Neste exemplo, a tarefa type é worker e a tarefa index é 0 .

Para fins de ilustração, Este tutorial mostra como definir um TF_CONFIG com 2 trabalhadores em localhost . Na prática, você criaria vários trabalhadores em um endereço IP externo e porta, e um conjunto TF_CONFIG em cada trabalhador de forma adequada, ou seja, modificar a tarefa index .

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Defina o modelo

Escreva as camadas, o otimizador e a função de perda para o treinamento. Este tutorial define o modelo com camadas Keras, semelhante ao tutorial de treinamento multi-GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Para treinar o modelo, use uma instância de tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy cria cópias de todas as variáveis em camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps , uma op TensorFlow para comunicação coletiva, a gradientes de agregados e manter as variáveis em sincronia. O tf.distribute.Strategy guia tem mais detalhes sobre esta estratégia.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_1884/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Treine e avalie o modelo

Em seguida, especificar a estratégia de distribuição no RunConfig para o estimador, e treinar e avaliar invocando tf.estimator.train_and_evaluate . Este tutorial distribui apenas a formação especificando a estratégia via train_distribute . Também é possível distribuir a avaliação via eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7fa86c4c8950>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:374: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2021-09-09 01:25:08.941607: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2021-09-09 01:25:08.942715: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:loss = 2.3013024, step = 0
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:global_step/sec: 296.028
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:loss = 2.3011568, step = 100 (0.340 sec)
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:global_step/sec: 325.74
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:loss = 2.3059464, step = 200 (0.307 sec)
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:global_step/sec: 317.605
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:loss = 2.296136, step = 300 (0.315 sec)
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:global_step/sec: 330.313
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:loss = 2.2860022, step = 400 (0.303 sec)
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:global_step/sec: 341.402
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:loss = 2.2717395, step = 500 (0.292 sec)
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:global_step/sec: 342.721
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:loss = 2.289622, step = 600 (0.292 sec)
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:global_step/sec: 328.597
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:loss = 2.2841775, step = 700 (0.304 sec)
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:global_step/sec: 345.242
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:loss = 2.2770503, step = 800 (0.289 sec)
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:global_step/sec: 721.717
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:loss = 2.255022, step = 900 (0.138 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Starting evaluation at 2021-09-09T01:25:24
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Inference Time : 1.34031s
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Finished evaluation at 2021-09-09-01:25:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2692595
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.135354.
INFO:tensorflow:Loss for final step: 1.135354.
({'loss': 2.2692595, 'global_step': 938}, [])

Otimize o desempenho do treinamento

Agora você tem um modelo e um Estimador capaz multi-trabalhador alimentado por tf.distribute.Strategy . Você pode tentar as seguintes técnicas para otimizar o desempenho do treinamento de vários trabalhadores:

  • Aumentar o tamanho do lote: O tamanho do lote especificado aqui é por GPU. Em geral, o maior tamanho de lote que cabe na memória da GPU é aconselhável.
  • Variáveis elenco: lançar as variáveis a tf.float se possível. O modelo oficial ResNet inclui um exemplo de como isso pode ser feito.
  • Use comunicação coletiva: MultiWorkerMirroredStrategy fornece várias implementações de comunicação coletivos .

    • RING colectivos baseados no anel implementos usando gRPC como camada de comunicação entre o hospedeiro.
    • NCCL usa NCCL da Nvidia para implementar coletivos.
    • AUTO adia a escolha para o tempo de execução.

    A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster. Para substituir a escolha automática, especifique um valor válido para a communication parâmetro de MultiWorkerMirroredStrategy construtor 's, por exemplo, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Visite a seção Desempenho no guia para aprender mais sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho de seus modelos TensorFlow.

Outros exemplos de código

  1. Terminar a exemplo final para formação de trabalhadores de vários em tensorflow / ecossistema usando modelos Kubernetes. Este exemplo começa com um modelo Keras e converte-lo para um estimador usando o tf.keras.estimator.model_to_estimator API.
  2. Modelos oficiais , muitos dos quais podem ser configurados para executar múltiplas estratégias de distribuição.