Esta página foi traduzida pela API Cloud Translation.
Switch to English

Treinamento para trabalhadores múltiplos com Keras

Ver em TensorFlow.org Executar no Google Colab Ver fonte no GitHub Download do caderno

Visão geral

Este tutorial demonstra o treinamento distribuído para vários trabalhadores com o modelo Keras usando a API tf.distribute.Strategy , especificamente tf.distribute.experimental.MultiWorkerMirroredStrategy . Com a ajuda dessa estratégia, um modelo Keras projetado para ser executado em trabalhador único pode funcionar perfeitamente em vários trabalhadores com alteração mínima de código.

O guia Treinamento distribuído no TensorFlow está disponível para uma visão geral das estratégias de distribuição que o TensorFlow suporta para aqueles interessados ​​em um entendimento mais profundo das APIs tf.distribute.Strategy .

Configuração

Primeiro, configure o TensorFlow e as importações necessárias.

 import os
import tensorflow as tf
import numpy as np
 

Preparando conjunto de dados

Agora, vamos preparar o conjunto de dados MNIST. O conjunto de dados MNIST compreende 60.000 exemplos de treinamento e 10.000 exemplos de teste dos dígitos manuscritos 0–9, formatados como imagens monocromáticas de 28x28 pixels. Neste exemplo, levaremos a parte de treinamento dos conjuntos de dados para demonstrar.

 def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset
 

Construa o modelo Keras

Aqui usamos a API tf.keras.Sequential para criar e compilar um modelo Keras de redes neurais convolucionais simples para treinar com nosso conjunto de dados MNIST.

 def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
 

Vamos primeiro tentar treinar o modelo para um pequeno número de épocas e observar os resultados em um único trabalhador para garantir que tudo funcione corretamente. Você deve esperar que a perda caia e a precisão se aproxime de 1,0 à medida que a época avança.

 per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 0s 2ms/step - loss: 2.2701 - accuracy: 0.2451
Epoch 2/3
70/70 [==============================] - 0s 2ms/step - loss: 2.1827 - accuracy: 0.4777
Epoch 3/3
70/70 [==============================] - 0s 2ms/step - loss: 2.0865 - accuracy: 0.5955

<tensorflow.python.keras.callbacks.History at 0x7fc59381ac50>

Configuração de vários trabalhadores

Agora vamos entrar no mundo do treinamento para vários trabalhadores. No TensorFlow, a variável de ambiente TF_CONFIG é necessária para o treinamento em várias máquinas, cada uma das quais possivelmente possui uma função diferente. TF_CONFIG é uma sequência JSON usada para especificar a configuração do cluster em cada trabalhador que faz parte do cluster.

Existem dois componentes do TF_CONFIG : cluster e task . cluster fornece informações sobre o cluster de treinamento, que é um ditado que consiste em diferentes tipos de tarefas, como worker . No treinamento para vários trabalhadores com o MultiWorkerMirroredStrategy , geralmente há um worker que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever o arquivo de resumo do TensorBoard, além do que um worker comum faz. Esse trabalhador é chamado de trabalhador chief e é habitual que o worker com index 0 seja nomeado como worker principal (na verdade, é assim que tf.distribute.Strategy é implementado). task por outro lado, fornece informações sobre a tarefa atual. O primeiro cluster componentes é o mesmo para todos os trabalhadores, e a task do segundo componente é diferente em cada trabalhador e especifica o type e o index desse trabalhador.

Neste exemplo, definimos o type tarefa como "worker" e o index da tarefa como 0 . Isso significa que a máquina que possui essa configuração é o primeiro trabalhador, que será designado como o principal trabalhador e fará mais trabalho do que outros trabalhadores. Observe que outras máquinas também precisam ter a variável de ambiente TF_CONFIG definida e ela deve ter o mesmo comando de cluster , mas um type tarefa ou index tarefa diferente, dependendo de quais são as funções dessas máquinas.

Para fins ilustrativos, este tutorial mostra como se pode definir um TF_CONFIG com 2 trabalhadores no localhost . Na prática, os usuários criariam vários trabalhadores em endereços / portas IP externos e TF_CONFIG em cada trabalhador adequadamente.

 os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})
 

Observe que, enquanto a taxa de aprendizado é fixa neste exemplo, em geral pode ser necessário ajustar a taxa de aprendizado com base no tamanho global do lote.

Escolha a estratégia certa

No TensorFlow, o treinamento distribuído consiste em treinamento síncrono, onde as etapas do treinamento são sincronizadas entre os trabalhadores e réplicas, e treinamento assíncrono, onde as etapas do treinamento não são estritamente sincronizadas.

MultiWorkerMirroredStrategy , que é a estratégia recomendada para o treinamento síncrono de vários trabalhadores, será demonstrado neste guia. Para treinar o modelo, use uma instância de tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy cria cópias de todas as variáveis ​​nas camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps , um TensorFlow para comunicação coletiva, para agregar gradientes e manter as variáveis ​​sincronizadas. O guia tf.distribute.Strategy tem mais detalhes sobre essa estratégia.

 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy fornece várias implementações por meio do parâmetro CollectiveCommunication . RING implementa coletivos baseados em anel usando o gRPC como a camada de comunicação entre hosts. NCCL usa a NCCL da Nvidia para implementar coletivos. AUTO adia a escolha para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e a interconexão da rede no cluster.

Treine o modelo com o MultiWorkerMirroredStrategy

Com a integração da API tf.distribute.Strategy no tf.keras , a única alteração que você fará para distribuir o treinamento para vários trabalhadores é encerrar a chamada model building e model.compile() dentro de strategy.scope() . O escopo da estratégia de distribuição determina como e onde as variáveis ​​são criadas e, no caso do MultiWorkerMirroredStrategy , as variáveis ​​criadas são MirroredVariable s e são replicadas em cada um dos trabalhadores.

 num_workers = 4

# Here the batch size scales up by number of workers since 
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64, 
# and now this becomes 128.
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = build_and_compile_cnn_model()

# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
 
Epoch 1/3
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
70/70 [==============================] - 0s 3ms/step - loss: 2.2682 - accuracy: 0.2265
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1714 - accuracy: 0.4954
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.0638 - accuracy: 0.6232

<tensorflow.python.keras.callbacks.History at 0x7fc5f4f062e8>

Fragmento de conjunto de dados e tamanho do lote

No treinamento para vários trabalhadores com o MultiWorkerMirroredStrategy , é necessário compartilhar o conjunto de dados para garantir a convergência e o desempenho. No entanto, observe que no snippet de código acima, os conjuntos de dados são diretamente passados ​​para model.fit() sem a necessidade de fragmentar; isso ocorre porque a API tf.distribute.Strategy cuida do sharding do conjunto de dados automaticamente. Ele fragmenta o conjunto de dados no nível do arquivo, o que pode criar fragmentos distorcidos. Em casos extremos em que há apenas um arquivo, apenas o primeiro fragmento (ou seja, trabalhador) receberá dados de treinamento ou avaliação e, como resultado, todos os trabalhadores receberão erros.

Se você preferir o sharding manual para seu treinamento, o sharding automático pode ser desativado através da API tf.data.experimental.DistributeOptions . Concretamente,

 options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
 

Outra coisa a se notar é o tamanho do lote para os datasets . No snippet de código acima, usamos global_batch_size = per_worker_batch_size * num_workers , que é num_workers vezes maior que no caso de trabalhador único, porque o tamanho do lote efetivo por trabalhador é o tamanho do lote global (o parâmetro passado em tf.data.Dataset.batch() ) dividido pelo número de trabalhadores e, com essa alteração, mantemos o tamanho do lote por trabalhador igual ao de antes.

Avaliação

Se você passar validation_data para model.fit , ele alternará entre treinamento e avaliação para cada época. A avaliação que toma validation_data é distribuída pelo mesmo conjunto de trabalhadores e os resultados da avaliação são agregados e disponíveis para todos os trabalhadores. Semelhante ao treinamento, o conjunto de dados de validação é dividido automaticamente no nível do arquivo. Você precisa definir um tamanho de lote global no conjunto de dados de validação e definir validation_steps . Um conjunto de dados repetido também é recomendado para avaliação.

Como alternativa, você também pode criar outra tarefa que leia periodicamente os pontos de verificação e execute a avaliação. É isso que o Estimator faz. Mas essa não é uma maneira recomendada de realizar avaliação e, portanto, seus detalhes são omitidos.

Predição

Atualmente model.predict não funciona com o MultiWorkerMirroredStrategy.

atuação

Agora você tem um modelo Keras configurado para executar em vários trabalhadores com o MultiWorkerMirroredStrategy . Você pode tentar as seguintes técnicas para ajustar o desempenho do treinamento para vários trabalhadores com o MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy fornece várias implementações de comunicação coletiva . RING implementa coletivos baseados em anel usando o gRPC como a camada de comunicação entre hosts. NCCL usa a NCCL da Nvidia para implementar coletivos. AUTO adia a escolha para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e a interconexão da rede no cluster. Para substituir a opção automática, especifique um valor válido para o parâmetro de communication do MultiWorkerMirroredStrategy do MultiWorkerMirroredStrategy , por exemplo, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .
  • Lance as variáveis ​​para tf.float se possível. O modelo oficial da ResNet inclui um exemplo de como isso pode ser feito.

Tolerância ao erro

No treinamento síncrono, o cluster falhará se um dos trabalhadores falhar e nenhum mecanismo de recuperação de falhas existir. O uso do Keras com o tf.distribute.Strategy a vantagem da tolerância a falhas nos casos em que os trabalhadores morrem ou são instáveis. Fazemos isso preservando o estado de treinamento no sistema de arquivos distribuído de sua escolha, para que, após a reinicialização da instância que falhou ou tenha se antecipado anteriormente, o estado de treinamento seja recuperado.

Como todos os trabalhadores são mantidos em sincronia em termos de etapas e etapas do treinamento, outros trabalhadores precisam aguardar a reinicialização do trabalhador com falha ou antecipado.

Retorno de chamada ModelCheckpoint

ModelCheckpoint retorno de chamada ModelCheckpoint não fornece mais a funcionalidade de tolerância a falhas, use o retorno de chamada BackupAndRestore .

O retorno de chamada ModelCheckpoint ainda pode ser usado para salvar pontos de verificação. Mas com isso, se o treinamento foi interrompido ou concluído com êxito, para continuar o treinamento a partir do ponto de verificação, o usuário é responsável por carregar o modelo manualmente. Opcionalmente, o usuário pode optar por salvar e restaurar o modelo / pesos fora do retorno de chamada ModelCheckpoint .

Salvamento e carregamento do modelo

Para salvar seu modelo usando model.save ou tf.saved_model.save , o destino para salvar precisa ser diferente para cada trabalhador. Nos trabalhadores não-chefe, você precisará salvar o modelo em um diretório temporário e, no chefe, precisará salvar no diretório de modelo fornecido. Os diretórios temporários no trabalhador precisam ser exclusivos para evitar erros resultantes de vários trabalhadores tentando gravar no mesmo local. O modelo salvo em todos os diretórios é idêntico e geralmente apenas o modelo salvo pelo chefe deve ser referenciado para restauração ou veiculação. Recomendamos que você tenha alguma lógica de limpeza que exclua os diretórios temporários criados pelos trabalhadores após a conclusão do treinamento.

O motivo pelo qual você precisa economizar no chefe e nos trabalhadores ao mesmo tempo é porque você pode agregar variáveis ​​durante o ponto de verificação, o que exige que o chefe e os trabalhadores participem do protocolo de comunicação de redução de reduções. Por outro lado, permitir que chefe e trabalhadores salve no mesmo diretório de modelo resultará em erros devido a contenção.

Com o MultiWorkerMirroredStrategy , o programa é executado em todos os trabalhadores e, para saber se o trabalhador atual é chefe, aproveitamos o objeto de resolução de cluster que possui os atributos task_type e task_id . task_type informa qual é o trabalho atual (por exemplo, 'worker') e task_id informa o identificador do trabalhador. O trabalhador com o ID 0 é designado como trabalhador principal.

No snippet de código abaixo, write_filepath fornece o caminho do arquivo a ser write_filepath , que depende da identificação do trabalhador. No caso de chief (worker com id 0), ele grava no caminho do arquivo original; para outros, ele cria um diretório temporário (com o ID no caminho do diretório) para escrever:

 model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works 
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
 

Com isso, você está pronto para salvar:

 multi_worker_model.save(write_model_path)
 
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Como descrevemos acima, mais tarde, o modelo deve ser carregado apenas do caminho que o chefe salvou, então vamos remover os temporários que os trabalhadores não-chefe salvaram:

 if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))
 

Agora, quando for hora de carregar, vamos usar a API tf.keras.models.load_model conveniente e continuar com o trabalho. Aqui, assumimos apenas o uso de um único trabalhador para carregar e continuar o treinamento. tf.keras.models.load_model caso, você não chama tf.keras.models.load_model dentro de outro strategy.scope() .

 loaded_model = tf.keras.models.load_model(model_path)

# Now that we have the model restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9825 - accuracy: 0.1102
Epoch 2/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9367 - accuracy: 0.1117

<tensorflow.python.keras.callbacks.History at 0x7fc5f4b0d8d0>

Salvar e restaurar pontos de verificação

Por outro lado, o ponto de verificação permite salvar os pesos do modelo e restaurá-los sem precisar salvar o modelo inteiro. Aqui, você criará um tf.train.Checkpoint que rastreia o modelo, que é gerenciado por um tf.train.CheckpointManager para que apenas o último ponto de verificação seja preservado.

 checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
  checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
 

Depois que o CheckpointManager estiver configurado, você estará pronto para salvar e remover os pontos de verificação que os trabalhadores não-chefe salvaram.

 checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)
 

Agora, quando você precisar restaurar, poderá encontrar o ponto de verificação mais recente salvo usando a conveniente função tf.train.latest_checkpoint . Após restaurar o ponto de verificação, você pode continuar com o treinamento.

 latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9841 - accuracy: 0.6561
Epoch 2/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9445 - accuracy: 0.6805

<tensorflow.python.keras.callbacks.History at 0x7fc5f49d9d30>

Retorno de chamada de BackupAndRestore

O retorno de chamada BackupAndRestore fornece a funcionalidade de tolerância a falhas, fazendo backup do modelo e do número da época atual em um arquivo de ponto de verificação temporário sob o argumento backup_dir no BackupAndRestore . Isso é feito no final de cada época.

Depois que os trabalhos são interrompidos e reiniciados, o retorno de chamada restaura o último ponto de verificação e o treinamento continua desde o início da época interrompida. Qualquer treinamento parcial já realizado na época inacabada antes da interrupção será descartado, para que não afete o estado final do modelo.

Para usá-lo, forneça uma instância de tf.keras.callbacks.experimental.BackupAndRestore na chamada tf.keras.Model.fit() .

Com o MultiWorkerMirroredStrategy, se um trabalhador for interrompido, o cluster inteiro fará uma pausa até que o trabalhador interrompido seja reiniciado. Outros trabalhadores também serão reiniciados e o trabalhador interrompido voltará ao cluster. Em seguida, todo trabalhador lê o arquivo do ponto de verificação que foi salvo anteriormente e recupera seu estado anterior, permitindo que o cluster volte a sincronizar. Então o treinamento continua.

BackupAndRestore retorno de chamada BackupAndRestore usa o CheckpointManager para salvar e restaurar o estado do treinamento, que gera um arquivo chamado ponto de verificação que rastreia os pontos de verificação existentes junto com o mais recente. Por esse motivo, backup_dir não deve ser reutilizado para armazenar outros pontos de verificação para evitar colisão de nomes.

Atualmente, o retorno de chamada BackupAndRestore oferece suporte a um único trabalhador sem estratégia, MirroredStrategy, e multi-trabalhador com MultiWorkerMirroredStrategy. Abaixo estão dois exemplos de treinamento para trabalhadores múltiplos e treinamento para trabalhadores individuais.

 # Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
 
Epoch 1/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2837 - accuracy: 0.1836
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2131 - accuracy: 0.4091
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1310 - accuracy: 0.5485

<tensorflow.python.keras.callbacks.History at 0x7fc5f49a3080>

Se você inspecionar o diretório backup_dir especificado em BackupAndRestore , poderá observar alguns arquivos de ponto de verificação gerados temporariamente. Esses arquivos são necessários para recuperar as instâncias perdidas anteriormente e serão removidos pela biblioteca no final de tf.keras.Model.fit() após a saída bem-sucedida do seu treinamento.

Veja também

  1. Guia de treinamento distribuído no TensorFlow fornece uma visão geral das estratégias de distribuição disponíveis.
  2. Modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.
  3. A seção Desempenho no guia fornece informações sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho dos seus modelos TensorFlow.