Loop de treinamento personalizado com Keras e MultiWorkerMirroredStrategy

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial demonstra o treinamento de vários trabalhadores com a API de loop de treinamento personalizada, distribuída via MultiWorkerMirroredStrategy, de modo que um modelo Keras projetado para ser executado em um único trabalhador pode funcionar perfeitamente em vários trabalhadores com alterações mínimas de código.

Estamos usando loops de treinamento personalizados para treinar nosso modelo porque eles nos dão flexibilidade e um maior controle sobre o treinamento. Além disso, é mais fácil depurar o modelo e o loop de treinamento. Informações mais detalhadas estão disponíveis em Escrevendo um loop de treinamento do zero .

Se você está procurando como usar MultiWorkerMirroredStrategy com keras model.fit , consulte este tutorial .

O guia Treinamento distribuído no TensorFlow está disponível para uma visão geral das estratégias de distribuição compatíveis com o TensorFlow para aqueles interessados ​​em um entendimento mais profundo das APIs tf.distribute.Strategy .

Configurar

Primeiro, algumas importações necessárias.

import json
import os
import sys

Antes de importar o TensorFlow, faça algumas alterações no ambiente.

Desative todas as GPUs. Isso evita erros causados ​​por todos os trabalhadores tentando usar a mesma GPU. Para uma aplicação real, cada trabalhador estaria em uma máquina diferente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Redefina a variável de ambiente TF_CONFIG , você verá mais sobre isso mais tarde.

os.environ.pop('TF_CONFIG', None)

Certifique-se de que o diretório atual está no caminho do python. Isso permite que o notebook importe os arquivos gravados por %%writefile posteriormente.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Agora importe o TensorFlow.

import tensorflow as tf

Conjunto de dados e definição de modelo

Em seguida, crie um arquivo mnist.py com um modelo simples e configuração de conjunto de dados. Este arquivo Python será usado pelos processos de trabalho neste tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configuração de vários trabalhadores

Agora vamos entrar no mundo do treinamento multi-trabalhadores. No TensorFlow, a variável de ambiente TF_CONFIG é necessária para o treinamento em várias máquinas, cada uma delas possivelmente com uma função diferente. TF_CONFIG usado abaixo, é uma string JSON usada para especificar a configuração do cluster em cada trabalhador que faz parte do cluster. Este é o método padrão para especificar um cluster, usando cluster_resolver.TFConfigClusterResolver , mas há outras opções disponíveis no distribute.cluster_resolver módulo.

Descreva seu cluster

Aqui está um exemplo de configuração:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aqui está o mesmo TF_CONFIG serializado como uma string JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Existem dois componentes de TF_CONFIG : cluster e task .

  • cluster é o mesmo para todos os trabalhadores e fornece informações sobre o cluster de treinamento, que é um dict que consiste em diferentes tipos de trabalhos, como worker . No treinamento de vários trabalhadores com MultiWorkerMirroredStrategy , geralmente há um worker que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever um arquivo de resumo para o TensorBoard, além do que um worker regular faz. Esse trabalhador é referido como o trabalhador chief , e é comum que o worker com index 0 seja indicado como o worker chefe (na verdade, é assim que tf.distribute.Strategy é implementado).

  • task fornece informações da tarefa atual e é diferente em cada trabalhador. Ele especifica o type e o index desse trabalhador.

Neste exemplo, você define o type tarefa como "worker" e o index da tarefa como 0 . Esta máquina é o primeiro operário e será nomeada operária principal e fará mais trabalho do que as outras. Observe que outras máquinas também precisarão ter a variável de ambiente TF_CONFIG configurada, e deve ter o mesmo dicionário de cluster , mas type tarefa ou index tarefa diferente dependendo de quais são as funções dessas máquinas.

Para fins de ilustração, este tutorial mostra como se pode definir um TF_CONFIG com 2 workers no localhost . Na prática, os usuários criariam vários workers em endereços / portas IP externos e TF_CONFIG em cada worker apropriadamente.

Neste exemplo, você usará 2 trabalhadores, o TF_CONFIG do primeiro trabalhador é mostrado acima. Para o segundo trabalhador, você tf_config['task']['index']=1

Acima, tf_config é apenas uma variável local em python. Para realmente usá-lo para configurar o treinamento, este dicionário precisa ser serializado como JSON e colocado na variável de ambiente TF_CONFIG .

Variáveis ​​de ambiente e subprocessos em notebooks

Os subprocessos herdam variáveis ​​de ambiente de seu pai. Portanto, se você definir uma variável de ambiente neste processo do jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Você pode acessar a variável de ambiente de um subprocessos:

echo ${GREETINGS}
Hello TensorFlow!

Na próxima seção, você usará isso para passar o TF_CONFIG para os subprocessos do trabalhador. Você nunca iniciaria realmente seus trabalhos dessa maneira, mas é suficiente para os objetivos deste tutorial: Para demonstrar um exemplo de multiusuário mínimo.

MultiWorkerMirroredStrategy

Para treinar o modelo, use uma instância de tf.distribute.MultiWorkerMirroredStrategy , que cria cópias de todas as variáveis ​​nas camadas do modelo em cada dispositivo em todos os trabalhadores. O guia tf.distribute.Strategy contém mais detalhes sobre essa estratégia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

Use tf.distribute.Strategy.scope para especificar que uma estratégia deve ser usada ao construir seu modelo. Isso o coloca no " contexto de réplica cruzada " para essa estratégia, o que significa que a estratégia é colocada no controle de coisas como posicionamento de variável.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Fragmente automaticamente seus dados entre os trabalhadores

No treinamento de vários trabalhadores, a fragmentação do conjunto de dados não é necessariamente necessária; no entanto, oferece exatamente uma semântica, o que torna mais treinamento mais reproduzível, ou seja, o treinamento em vários trabalhadores deve ser o mesmo que o treinamento em um único trabalhador. Nota: o desempenho pode ser afetado em alguns casos.

Veja: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Defina o ciclo de treinamento personalizado e treine o modelo

Especifique um otimizador

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Defina uma etapa de treinamento com tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Salvando e restaurando pontos de verificação

A implementação do ponto de verificação em um loop de treinamento personalizado requer que o usuário lide com isso em vez de usar um retorno de chamada keras. Permite salvar os pesos do modelo e restaurá-los sem ter que salvar todo o modelo.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Aqui, você criará um tf.train.Checkpoint que rastreia o modelo, que é gerenciado por um tf.train.CheckpointManager para que apenas o último ponto de verificação seja preservado.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Agora, quando precisar restaurar, você pode encontrar o último ponto de verificação salvo usando a conveniente função tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Depois de restaurar o ponto de verificação, você pode continuar treinando seu loop de treinamento personalizado.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.819531, train_loss: 0.561418.
Epoch: 1, accuracy: 0.938616, train_loss: 0.206848.
Epoch: 2, accuracy: 0.954799, train_loss: 0.146723.

Configuração completa do código nos trabalhadores

Para realmente executar com MultiWorkerMirroredStrategy você precisará executar processos de trabalho e passar um TF_CONFIG para eles.

Como o arquivo mnist.py escrito anteriormente, aqui está o main.py que contém o mesmo código que percorremos passo a passo anteriormente neste colab, estamos apenas gravando em um arquivo para que cada um dos workers o execute:

Arquivo: main.py

Writing main.py

Treinar e avaliar

O diretório atual agora contém os dois arquivos Python:

ls *.py
main.py
mnist.py

Então json-serialize o TF_CONFIG e adicione-o às variáveis ​​de ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora, você pode iniciar um processo de trabalho que executará o main.py e usará o TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Existem algumas coisas a serem observadas sobre o comando acima:

  1. Ele usa o %%bash que é um notebook "mágico" para executar alguns comandos do bash.
  2. Ele usa a sinalização --bg para executar o processo bash em segundo plano, porque este trabalhador não será encerrado. Ele espera por todos os trabalhadores antes de começar.

O processo de trabalho em segundo plano não imprimirá a saída para este bloco de notas, então o &> redireciona sua saída para um arquivo, para que você possa ver o que aconteceu.

Portanto, espere alguns segundos para que o processo seja iniciado:

import time
time.sleep(20)

Agora veja o que foi gerado para o arquivo de log do trabalhador até agora:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

A última linha do arquivo de log deve dizer: Started server with target: grpc://localhost:12345 . O primeiro trabalhador está pronto e aguardando que todos os outros trabalhadores estejam prontos para prosseguir.

Portanto, atualize o tf_config para que o segundo processo do trabalhador pegue:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora lance o segundo trabalhador. Isso iniciará o treinamento, já que todos os trabalhadores estão ativos (portanto, não há necessidade de colocar em segundo plano este processo):

python main.py > /dev/null 2>&1

Agora, se você verificar novamente os registros escritos pelo primeiro trabalhador, verá que ele participou do treinamento desse modelo:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:42:39.265636: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:42:39.266014: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch: 0, accuracy: 0.836384, train_loss: 0.517218.
Epoch: 1, accuracy: 0.937277, train_loss: 0.200661.
Epoch: 2, accuracy: 0.961161, train_loss: 0.137424.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Treinamento de vários trabalhadores em profundidade

Este tutorial demonstrou um fluxo de trabalho de Custom Training Loop da configuração de vários trabalhadores. Uma descrição detalhada de outros tópicos está disponível no model.fit's guide configuração de vários trabalhadores e aplicável a CTLs.

Veja também

  1. O guia Treinamento distribuído no TensorFlow fornece uma visão geral das estratégias de distribuição disponíveis.
  2. Modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.
  3. A seção Desempenho do guia fornece informações sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho dos seus modelos do TensorFlow.