Treinamento de vários trabalhadores com Keras

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial demonstra o treinamento distribuído de vários trabalhadores com o modelo Keras usando a API tf.distribute.Strategy , especificamente tf.distribute.MultiWorkerMirroredStrategy . Com a ajuda dessa estratégia, um modelo Keras que foi projetado para ser executado em um único trabalhador pode funcionar perfeitamente em vários trabalhadores com alterações mínimas de código.

O guia Treinamento distribuído no TensorFlow está disponível para uma visão geral das estratégias de distribuição compatíveis com o TensorFlow para aqueles interessados ​​em um entendimento mais profundo das APIs tf.distribute.Strategy .

Configurar

Primeiro, algumas importações necessárias.

import json
import os
import sys

Antes de importar o TensorFlow, faça algumas alterações no ambiente.

Desative todas as GPUs. Isso evita erros causados ​​por todos os trabalhadores tentando usar a mesma GPU. Para uma aplicação real, cada trabalhador estaria em uma máquina diferente.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Redefina a variável de ambiente TF_CONFIG , você verá mais sobre isso mais tarde.

os.environ.pop('TF_CONFIG', None)

Certifique-se de que o diretório atual está no caminho do python. Isso permite que o notebook importe os arquivos gravados por %%writefile posteriormente.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Agora importe o TensorFlow.

import tensorflow as tf

Conjunto de dados e definição de modelo

Em seguida, crie um arquivo mnist.py com um modelo simples e configuração de conjunto de dados. Este arquivo Python será usado pelos processos de trabalho neste tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Tente treinar o modelo para um pequeno número de épocas e observe os resultados de um único trabalhador para garantir que tudo funcione corretamente. Conforme o treinamento avança, a perda deve diminuir e a precisão deve aumentar.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2783 - accuracy: 0.2152
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2298 - accuracy: 0.3783
Epoch 3/3
70/70 [==============================] - 1s 11ms/step - loss: 2.1673 - accuracy: 0.5096
<tensorflow.python.keras.callbacks.History at 0x7fb074a356d0>

Configuração de vários trabalhadores

Agora vamos entrar no mundo do treinamento multiusuário. No TensorFlow, a variável de ambiente TF_CONFIG é necessária para o treinamento em várias máquinas, cada uma delas possivelmente com uma função diferente. TF_CONFIG é uma string JSON usada para especificar a configuração do cluster em cada trabalhador que faz parte do cluster.

Aqui está um exemplo de configuração:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aqui está o mesmo TF_CONFIG serializado como uma string JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Existem dois componentes de TF_CONFIG : cluster e task .

  • cluster é o mesmo para todos os trabalhadores e fornece informações sobre o cluster de treinamento, que é um dict que consiste em diferentes tipos de trabalhos, como worker . No treinamento de vários trabalhadores com MultiWorkerMirroredStrategy , geralmente há um worker que assume um pouco mais de responsabilidade, como salvar o ponto de verificação e escrever um arquivo de resumo para o TensorBoard, além do que um worker regular faz. Esse trabalhador é referido como o trabalhador chief , e é comum que o worker com index 0 seja indicado como o worker chefe (na verdade, é assim que tf.distribute.Strategy é implementado).

  • task fornece informações da tarefa atual e é diferente em cada trabalhador. Ele especifica o type e o index desse trabalhador.

Neste exemplo, você define o type tarefa como "worker" e o index da tarefa como 0 . Esta máquina é o primeiro operário e será nomeada operária principal e fará mais trabalho do que as outras. Observe que outras máquinas também precisarão ter a variável de ambiente TF_CONFIG configurada, e deve ter o mesmo dicionário de cluster , mas type tarefa ou index tarefa diferente dependendo de quais são as funções dessas máquinas.

Para fins de ilustração, este tutorial mostra como se pode definir um TF_CONFIG com 2 workers no localhost . Na prática, os usuários criariam vários workers em endereços / portas IP externos e TF_CONFIG em cada worker apropriadamente.

Neste exemplo, você usará 2 trabalhadores, o TF_CONFIG do primeiro trabalhador é mostrado acima. Para o segundo trabalhador, você tf_config['task']['index']=1

Acima, tf_config é apenas uma variável local em python. Para realmente usá-lo para configurar o treinamento, este dicionário precisa ser serializado como JSON e colocado na variável de ambiente TF_CONFIG .

Variáveis ​​de ambiente e subprocessos em notebooks

Os subprocessos herdam variáveis ​​de ambiente de seus pais. Portanto, se você definir uma variável de ambiente neste processo do jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Você pode acessar a variável de ambiente de um subprocessos:

echo ${GREETINGS}
Hello TensorFlow!

Na próxima seção, você usará isso para passar o TF_CONFIG para os subprocessos do trabalhador. Você nunca iniciaria realmente seus trabalhos dessa maneira, mas é suficiente para os objetivos deste tutorial: Para demonstrar um exemplo de multiusuário mínimo.

Escolha a estratégia certa

No TensorFlow, existem duas formas principais de treinamento distribuído:

  • Treinamento síncrono, onde as etapas do treinamento são sincronizadas entre os workers e réplicas, e
  • Treinamento assíncrono, em que as etapas de treinamento não são estritamente sincronizadas.

MultiWorkerMirroredStrategy , que é a estratégia recomendada para treinamento síncrono de vários trabalhadores, será demonstrada neste guia. Para treinar o modelo, use uma instância de tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy cria cópias de todas as variáveis ​​nas camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps , um TensorFlow op para comunicação coletiva, para agregar gradientes e manter as variáveis ​​em sincronia. O guia tf.distribute.Strategy contém mais detalhes sobre essa estratégia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fornece várias implementações por meio do parâmetro CommunicationOptions . RING implementa coletivos baseados em anel usando gRPC como camada de comunicação entre hosts. NCCL usa a NCCL da Nvidia para implementar coletivos. AUTO transfere a escolha para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster.

Treine o modelo

Com a integração da API tf.distribute.Strategy em tf.keras , a única alteração que você fará para distribuir o treinamento para vários trabalhadores é incluir a construção do modelo e a chamada model.compile() dentro de strategy.scope() . O escopo da estratégia de distribuição dita como e onde as variáveis ​​são criadas e, no caso de MultiWorkerMirroredStrategy , as variáveis ​​criadas são MirroredVariable s, e são replicadas em cada um dos trabalhadores.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Para realmente executar com MultiWorkerMirroredStrategy você precisará executar processos de trabalho e passar um TF_CONFIG para eles.

Como o arquivo mnist.py escrito anteriormente, aqui está o main.py que cada um dos workers executará:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

No trecho de código acima, observe que global_batch_size , que é passado para Dataset.batch , é definido como per_worker_batch_size * num_workers . Isso garante que cada trabalhador processe lotes de exemplos de per_worker_batch_size independentemente do número de trabalhadores.

O diretório atual agora contém os dois arquivos Python:

ls *.py
main.py
mnist.py

Então json-serialize o TF_CONFIG e adicione-o às variáveis ​​de ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora, você pode iniciar um processo de trabalho que executará o main.py e usará o TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Existem algumas coisas a serem observadas sobre o comando acima:

  1. Ele usa o %%bash que é um notebook "mágico" para executar alguns comandos do bash.
  2. Ele usa a sinalização --bg para executar o processo bash em segundo plano, porque este trabalhador não será encerrado. Ele espera por todos os trabalhadores antes de começar.

O processo de trabalho em segundo plano não imprimirá a saída para este bloco de notas, então o &> redireciona sua saída para um arquivo, para que você possa ver o que aconteceu.

Portanto, aguarde alguns segundos para que o processo seja iniciado:

import time
time.sleep(10)

Agora veja o que foi gerado para o arquivo de log do trabalhador até agora:

cat job_0.log
2021-06-16 18:40:46.618023: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:47.717322: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:48.633623: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:48.633682: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633691: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633802: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:48.633834: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:48.633841: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:48.634554: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:48.639603: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:48.640062: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

A última linha do arquivo de log deve dizer: Started server with target: grpc://localhost:12345 . O primeiro trabalhador está pronto e aguardando que todos os outros trabalhadores estejam prontos para prosseguir.

Portanto, atualize o tf_config para que o segundo processo do trabalhador pegue:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora lance o segundo trabalhador. Isso iniciará o treinamento, uma vez que todos os trabalhadores estão ativos (portanto, não há necessidade de colocar em segundo plano este processo):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 53ms/step - loss: 2.2947 - accuracy: 0.1365
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2710 - accuracy: 0.2564
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2412 - accuracy: 0.3920
2021-06-16 18:40:56.710304: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:57.818915: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:58.745385: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:58.745442: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:58.745451: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:58.745567: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:58.745603: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:58.745609: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:58.746272: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:58.751609: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:58.752063: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:40:59.797443: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-06-16 18:41:00.007608: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:41:00.007984: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz

Agora, se você verificar novamente os registros escritos pelo primeiro trabalhador, verá que ele participou do treinamento desse modelo:

cat job_0.log
2021-06-16 18:40:46.618023: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:40:47.717322: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:40:48.633623: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:40:48.633682: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633691: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:40:48.633802: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:40:48.633834: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:40:48.633841: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:40:48.634554: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:40:48.639603: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:40:48.640062: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:40:59.794960: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-06-16 18:41:00.007264: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:41:00.007667: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch 1/3
70/70 [==============================] - 6s 53ms/step - loss: 2.2947 - accuracy: 0.1365
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2710 - accuracy: 0.2564
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2412 - accuracy: 0.3920

Não é novidade que isso foi executado mais lentamente do que o teste executado no início deste tutorial. Executar vários trabalhadores em uma única máquina só adiciona sobrecarga. O objetivo aqui não era melhorar o tempo de formação, mas apenas dar um exemplo de formação multi-trabalhadores.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Treinamento de vários trabalhadores em profundidade

Até agora, este tutorial demonstrou uma configuração básica para vários trabalhadores. O restante deste documento examina em detalhes outros fatores que podem ser úteis ou importantes para casos de uso reais.

Fragmentação de conjunto de dados

No treinamento de vários trabalhadores, a fragmentação do conjunto de dados é necessária para garantir a convergência e o desempenho.

O exemplo na seção anterior depende do autosharding padrão fornecido pela API tf.distribute.Strategy . Você pode controlar a fragmentação definindo tf.data.experimental.AutoShardPolicy de tf.data.experimental.DistributeOptions . Para saber mais sobre a fragmentação automática, consulte o guia de entrada distribuída .

Aqui está um exemplo rápido de como DESATIVAR a fragmentação automática, para que cada réplica processe todos os exemplos (não recomendado):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Avaliação

Se você passar validation_data para model.fit , ele alternará entre o treinamento e a avaliação para cada época. A avaliação de validation_data é distribuída pelo mesmo conjunto de trabalhadores e os resultados da avaliação são agregados e disponíveis para todos os trabalhadores. Semelhante ao treinamento, o conjunto de dados de validação é fragmentado automaticamente no nível do arquivo. Você precisa definir um tamanho de lote global no conjunto de dados de validação e definir validation_steps . Um conjunto de dados repetido também é recomendado para avaliação.

Como alternativa, você também pode criar outra tarefa que lê periodicamente os pontos de verificação e executa a avaliação. Isso é o que o Estimator faz. Mas esta não é uma forma recomendada de realizar a avaliação e, portanto, seus detalhes são omitidos.

Desempenho

Agora você tem um modelo Keras configurado para ser executado em vários workers com MultiWorkerMirroredStrategy . Você pode tentar as técnicas a seguir para ajustar o desempenho do treinamento de vários trabalhadores com MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy fornece várias implementações de comunicação coletiva . RING implementa coletivos baseados em anel usando gRPC como a camada de comunicação entre hosts. NCCL usa a NCCL da Nvidia para implementar coletivos. AUTO transfere a escolha para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster. Para substituir a escolha automática, especifique o parâmetro communication_options do MultiWorkerMirroredStrategy , por exemplo, communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • tf.float as variáveis ​​para tf.float se possível. O modelo oficial do ResNet inclui um exemplo de como isso pode ser feito.

Tolerância ao erro

No treinamento síncrono, o cluster falhará se um dos trabalhadores falhar e nenhum mecanismo de recuperação de falha existir. O uso de Keras com tf.distribute.Strategy tem a vantagem de tolerância a falhas nos casos em que os trabalhadores morrem ou ficam instáveis. Você faz isso preservando o estado de treinamento no sistema de arquivos distribuído de sua escolha, de modo que, após a reinicialização da instância que falhou ou interrompeu anteriormente, o estado de treinamento seja recuperado.

Quando um trabalhador fica indisponível, outros trabalhadores falharão (possivelmente após um tempo limite). Nesses casos, o trabalhador indisponível precisa ser reiniciado, assim como outros trabalhadores que falharam.

Callback ModelCheckpoint

ModelCheckpoint retorno de chamada ModelCheckpoint não fornece mais funcionalidade de tolerância a falhas, use o retorno de chamada BackupAndRestore .

O retorno de chamada ModelCheckpoint ainda pode ser usado para salvar pontos de verificação. Mas com isso, se o treinamento foi interrompido ou finalizado com sucesso, para continuar o treinamento a partir do checkpoint, o usuário é responsável por carregar o modelo manualmente.

Opcionalmente, o usuário pode escolher salvar e restaurar o modelo / pesos fora do retorno de chamada do ModelCheckpoint .

Salvar e carregar modelo

Para salvar seu modelo usando model.save ou tf.saved_model.save , o destino para salvar precisa ser diferente para cada trabalhador. Nos trabalhadores não principais, você precisará salvar o modelo em um diretório temporário e, no chefe, precisará salvar no diretório do modelo fornecido. Os diretórios temporários no trabalhador precisam ser exclusivos para evitar erros resultantes de vários trabalhadores tentando gravar no mesmo local. Os modelos salvos em todos os diretórios são idênticos e, normalmente, apenas o modelo salvo pelo chefe deve ser referenciado para restauração ou serviço. Você deve ter alguma lógica de limpeza que exclua os diretórios temporários criados pelos trabalhadores assim que seu treinamento for concluído.

O motivo pelo qual você precisa economizar no chefe e nos trabalhadores ao mesmo tempo é porque você pode agregar variáveis ​​durante o checkpoint, o que requer que o chefe e os trabalhadores participem do protocolo de comunicação allreduce. Por outro lado, permitir que o chefe e os trabalhadores salvem no mesmo diretório do modelo resultará em erros devido à contenção.

Com MultiWorkerMirroredStrategy , o programa é executado em todos os trabalhadores e, para saber se o trabalhador atual é o chefe, ele aproveita o objeto resolvedor de cluster que possui os atributos task_type e task_id . task_type diz a você qual é o trabalho atual (por exemplo, 'trabalhador'), e task_id diz a você o identificador do trabalhador. O trabalhador com id 0 é designado como o trabalhador chefe.

No trecho de código abaixo, write_filepath fornece o caminho do arquivo para gravação, que depende do ID do trabalhador. No caso de chefe (trabalhador com id 0), ele grava no caminho do arquivo original; para outros, ele cria um diretório temporário (com id no caminho do diretório) para escrever em:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Com isso, agora você está pronto para salvar:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Conforme descrito acima, posteriormente o modelo deve ser carregado apenas do caminho onde o chefe salvo, então vamos remover os temporários que os trabalhadores não-chefe salvaram:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Agora, quando é hora de carregar, vamos usar a conveniente API tf.keras.models.load_model e continuar com o trabalho adicional. Aqui, suponha que use apenas um único trabalhador para carregar e continuar o treinamento, caso em que você não chama tf.keras.models.load_model dentro de outro strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.3081 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2914 - accuracy: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7fb08ad02dd0>

Salvando e restaurando pontos de verificação

Por outro lado, o checkpoint permite salvar os pesos do modelo e restaurá-los sem ter que salvar todo o modelo. Aqui, você criará um tf.train.Checkpoint que rastreia o modelo, que é gerenciado por um tf.train.CheckpointManager para que apenas o último ponto de verificação seja preservado.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Assim que o CheckpointManager estiver configurado, você estará pronto para salvar e remover os checkpoints que os trabalhadores não chefes salvos.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Agora, quando precisar restaurar, você pode encontrar o último ponto de verificação salvo usando a conveniente função tf.train.latest_checkpoint . Depois de restaurar o ponto de verificação, você pode continuar com o treinamento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 12ms/step - loss: 2.3080 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 11ms/step - loss: 2.2896 - accuracy: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7fb08a2b6bd0>

Callback de BackupAndRestore

O retorno de chamada BackupAndRestore fornece funcionalidade de tolerância a falhas, fazendo backup do modelo e do número da época atual em um arquivo de ponto de verificação temporário sob o argumento backup_dir para BackupAndRestore . Isso é feito no final de cada época.

Depois que as tarefas são interrompidas e reiniciadas, o retorno de chamada restaura o último ponto de verificação e o treinamento continua desde o início da época interrompida. Qualquer treinamento parcial já feito na época inacabada antes da interrupção será descartado, de modo que não afete o estado final do modelo.

Para usá-lo, forneça uma instância de tf.keras.callbacks.experimental.BackupAndRestore na chamada tf.keras.Model.fit() .

Com MultiWorkerMirroredStrategy, se um trabalhador for interrompido, todo o cluster fará uma pausa até que o trabalhador interrompido seja reiniciado. Outros trabalhos também serão reiniciados e o trabalho interrompido retornará ao cluster. Em seguida, cada trabalhador lê o arquivo de ponto de verificação que foi salvo anteriormente e retoma seu estado anterior, permitindo que o cluster volte a sincronizar. Em seguida, o treinamento continua.

BackupAndRestore retorno de chamada BackupAndRestore usa CheckpointManager para salvar e restaurar o estado de treinamento, que gera um arquivo chamado checkpoint que rastreia os checkpoints existentes junto com o mais recente. Por esse motivo, backup_dir não deve ser reutilizado para armazenar outros pontos de verificação para evitar colisão de nomes.

Atualmente, o retorno de chamada BackupAndRestore oferece suporte a um único trabalhador sem estratégia, MirroredStrategy e multiusuário com MultiWorkerMirroredStrategy. Abaixo estão dois exemplos de treinamento de vários trabalhadores e treinamento de um único trabalhador.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2774 - accuracy: 0.2183
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2264 - accuracy: 0.3663
Epoch 3/3
70/70 [==============================] - 1s 11ms/step - loss: 2.1693 - accuracy: 0.4643
<tensorflow.python.keras.callbacks.History at 0x7fb08a032790>

Se você inspecionar o diretório de backup_dir especificado em BackupAndRestore , poderá notar alguns arquivos de ponto de verificação gerados temporariamente. Esses arquivos são necessários para recuperar as instâncias perdidas anteriormente e serão removidos pela biblioteca no final de tf.keras.Model.fit() após a saída bem-sucedida do treinamento.

Veja também

  1. O guia Treinamento distribuído no TensorFlow fornece uma visão geral das estratégias de distribuição disponíveis.
  2. Modelos oficiais , muitos dos quais podem ser configurados para executar várias estratégias de distribuição.
  3. A seção Desempenho do guia fornece informações sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho dos seus modelos do TensorFlow.