O Dia da Comunidade de ML é dia 9 de novembro! Junte-nos para atualização de TensorFlow, JAX, e mais Saiba mais

Treinamento de vários trabalhadores com Keras

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Este tutorial demonstra como executar multi-trabalhador treinamento distribuído com um modelo Keras eo Model.fit API usando o tf.distribute.Strategy API-especificamente a tf.distribute.MultiWorkerMirroredStrategy classe. Com a ajuda dessa estratégia, um modelo Keras que foi projetado para ser executado em um único trabalhador pode funcionar perfeitamente em vários trabalhadores com alterações mínimas de código.

Para aqueles interessados em uma compreensão mais profunda de tf.distribute.Strategy APIs, a formação Distribuído em TensorFlow guia está disponível para uma visão geral das estratégias de distribuição TensorFlow suporta.

Para saber como usar o MultiWorkerMirroredStrategy com Keras e um loop de treinamento personalizado, consulte o ciclo de treinamento personalizado com Keras e MultiWorkerMirroredStrategy .

Observe que o objetivo deste tutorial é demonstrar um exemplo mínimo de vários trabalhadores com dois trabalhadores.

Configurar

Comece com algumas importações necessárias:

import json
import os
import sys

Antes de importar o TensorFlow, faça algumas alterações no ambiente:

  1. Desative todas as GPUs. Isso evita erros causados ​​por todos os trabalhadores tentando usar a mesma GPU. Em um aplicativo do mundo real, cada trabalhador estaria em uma máquina diferente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Redefinir o TF_CONFIG variável de ambiente (você aprenderá mais sobre isso mais tarde):
os.environ.pop('TF_CONFIG', None)
  1. Certifique-se de que o diretório atual é em Python path-isto permite que o notebook para importar os arquivos escritos por %%writefile mais tarde:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Agora importe o TensorFlow:

import tensorflow as tf

Conjunto de dados e definição de modelo

Em seguida, criar uma mnist.py arquivo com um modelo simples e conjunto de dados de configuração. Este arquivo Python será usado pelos processos de trabalho neste tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Modelar o treinamento em um único trabalhador

Tente treinar o modelo para um pequeno número de épocas e observar os resultados de um único trabalhador para garantir que tudo funciona corretamente. Conforme o treinamento avança, a perda deve diminuir e a precisão deve aumentar.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

Configuração de vários trabalhadores

Agora vamos entrar no mundo do treinamento multi-trabalhadores.

Um cluster com trabalhos e tarefas

Em TensorFlow, formação distribuída envolve: um 'cluster' com vários postos de trabalho, e cada um dos postos de trabalho pode ter um ou mais 'task' s.

Você vai precisar do TF_CONFIG variável de ambiente de configuração para formação em várias máquinas, cada uma das quais, possivelmente, tem um papel diferente. TF_CONFIG é uma string JSON usado para especificar a configuração do cluster para cada trabalhador que faz parte do cluster.

Existem dois componentes de um TF_CONFIG variável: 'cluster' e 'task' .

  • Um 'cluster' é o mesmo para todos os trabalhadores e fornece informações sobre o cluster de formação, que é um dicionário composto por diferentes tipos de trabalhos, como 'worker' ou 'chief' .

    • No treinamento multi-trabalhador com tf.distribute.MultiWorkerMirroredStrategy , geralmente há um 'worker' que assume responsabilidades, como salvar um posto de controle e escrever um ficheiro de resumo para TensorBoard, para além do que um regular 'worker' faz. Tal 'worker' é referido como o chefe do trabalhador (com um nome de trabalho 'chief' ).
    • É habitual para o 'chief' ter 'index' 0 ser nomeado para (na verdade, é assim tf.distribute.Strategy é implementado).
  • A 'task' fornece informações da tarefa atual e é diferente para cada trabalhador. Ele especifica o 'type' e 'index' desse trabalhador.

Abaixo está um exemplo de configuração:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aqui é a mesma TF_CONFIG serializado como uma string JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Note-se que tf_config é apenas uma variável local em Python. Para ser capaz de usá-lo para uma configuração de formação, este dict precisa ser serializada como JSON e colocado em um TF_CONFIG variável de ambiente.

No exemplo de configuração acima, você define a tarefa 'type' de 'worker' ea tarefa 'index' para 0 . Portanto, esta máquina é o primeiro trabalhador. Ele será nomeado como o 'chief' do trabalhador e fazer mais trabalho do que os outros.

Para fins de ilustração, Este tutorial mostra como você pode configurar uma TF_CONFIG variável com dois trabalhadores em um localhost .

Na prática, você criaria vários trabalhadores em endereços IP / portas externas e definir um TF_CONFIG variável sobre cada trabalhador em conformidade.

Neste tutorial, você usará dois trabalhadores:

  • O primeiro ( 'chief' do trabalhador) TF_CONFIG é mostrado acima.
  • Para o segundo trabalhador, você irá definir tf_config['task']['index']=1

Variáveis ​​de ambiente e subprocessos em notebooks

Os subprocessos herdam variáveis ​​de ambiente de seu pai.

Por exemplo, você pode definir uma variável de ambiente neste processo do Jupyter Notebook da seguinte maneira:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Em seguida, você pode acessar a variável de ambiente de um subprocessos:

echo ${GREETINGS}
Hello TensorFlow!

Na próxima seção, você vai usar um método semelhante ao passar o TF_CONFIG para os subprocessos de trabalho. Em um cenário do mundo real, você não iniciaria seus trabalhos dessa maneira, mas é suficiente neste exemplo.

Escolha a estratégia certa

No TensorFlow, existem duas formas principais de treinamento distribuído:

  • Formação síncrona, onde as etapas de formação são sincronizados através dos trabalhadores e réplicas, e
  • Treinamento assíncrono, onde as etapas de formação não são estritamente sincronizado (por exemplo, formação servidor parâmetro ).

Este tutorial demonstra como executar formação multi-trabalhador síncrona usando uma instância do tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy cria cópias de todas as variáveis em camadas do modelo em cada dispositivo em todos os trabalhadores. Ele usa CollectiveOps , uma op TensorFlow para comunicação coletiva, a gradientes de agregados e manter as variáveis em sincronia. O tf.distribute.Strategy guia tem mais detalhes sobre esta estratégia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fornece várias implementações via o CommunicationOptions parâmetro: 1) RING implementos colectivos anel baseado usando gRPC como camada de comunicação entre o hospedeiro; 2) NCCL usa o Coletivo Comunicação Biblioteca NVIDIA para implementar coletivos; e 3) AUTO adia a escolha para o tempo de execução. A melhor escolha de implementação coletiva depende do número e tipo de GPUs e da interconexão de rede no cluster.

Treine o modelo

Com a integração da tf.distribute.Strategy API em tf.keras , a única mudança que você vai fazer para distribuir o treinamento para-vários trabalhadores está encerrando a construção de modelos e model.compile() chamada dentro strategy.scope() . Ditames escopo da estratégia de distribuição como e onde as variáveis são criados, e no caso de MultiWorkerMirroredStrategy , as variáveis criadas são MirroredVariable s, e eles são replicados em cada um dos trabalhadores.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Para rodar com MultiWorkerMirroredStrategy você precisa executar processos de trabalho e passar um TF_CONFIG para eles.

Como o mnist.py arquivo escrito anteriormente, aqui é o main.py que cada um dos trabalhadores será executado:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

No trecho de código acima nota que o global_batch_size , que é passada para Dataset.batch , está definido para per_worker_batch_size * num_workers . Isto assegura que todos os trabalhadores processa lotes de per_worker_batch_size exemplos, independentemente do número de trabalhadores.

O diretório atual agora contém os dois arquivos Python:

ls *.py
main.py
mnist.py

Então JSON serializar o TF_CONFIG e adicioná-lo para as variáveis de ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Agora, você pode iniciar um processo de trabalho que irá executar o main.py e usar o TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Existem algumas coisas a serem observadas sobre o comando acima:

  1. Ele usa a %%bash que é um notebook "mágica" para executar alguns comandos bash.
  2. Ele usa o --bg bandeira para executar a bash processo em segundo plano, porque este trabalhador não irá terminar. Ele espera por todos os trabalhadores antes de começar.

O processo de trabalho em segundo plano não será impresso saída para este notebook, então o &> redireciona sua saída para um arquivo para que você possa inspecionar o que aconteceu em um arquivo de log mais tarde.

Portanto, aguarde alguns segundos para que o processo seja iniciado:

import time
time.sleep(10)

Agora, inspecione o que foi gerado no arquivo de log do trabalhador até agora:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

A última linha do arquivo de log deve dizer: Started server with target: grpc://localhost:12345 . O primeiro trabalhador está pronto e aguardando que todos os outros trabalhadores estejam prontos para prosseguir.

Então atualizar o tf_config para o processo do segundo trabalhador para pegar:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Lance o segundo trabalhador. Isso iniciará o treinamento, uma vez que todos os trabalhadores estão ativos (portanto, não há necessidade de colocar em segundo plano este processo):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

Se você verificar novamente os registros escritos pelo primeiro trabalhador, aprenderá que ele participou do treinamento desse modelo:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

Sem surpresa, este correu mais lento do que o teste executado no início deste tutorial.

Executar vários trabalhadores em uma única máquina só adiciona sobrecarga.

O objetivo aqui não era melhorar o tempo de formação, mas apenas dar um exemplo de formação multi-trabalhadores.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Treinamento de vários trabalhadores em profundidade

Até agora, você aprendeu como realizar uma configuração básica de vários trabalhadores.

Durante o restante do tutorial, você aprenderá sobre outros fatores, que podem ser úteis ou importantes para casos de uso reais, em detalhes.

Fragmentação de conjunto de dados

No treinamento multi-trabalhador, é necessário conjunto de dados sharding para assegurar a convergência e desempenho.

O exemplo na seção anterior depende da autosharding padrão fornecida pelo tf.distribute.Strategy API. Você pode controlar a fragmentação definindo o tf.data.experimental.AutoShardPolicy dos tf.data.experimental.DistributeOptions .

Para saber mais sobre auto-sharding, consulte o guia de entrada distribuída .

Aqui está um exemplo rápido de como transformar o auto sharding off, para que cada réplica processa cada exemplo (não recomendado):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Avaliação

Se você passar no validation_data em Model.fit , ele irá alternar entre formação e avaliação para cada época. A avaliação de tomar a validation_data é distribuído entre o mesmo conjunto de trabalhadores e os resultados da avaliação são agregadas e disponíveis para todos os trabalhadores.

Semelhante ao treinamento, o conjunto de dados de validação é fragmentado automaticamente no nível do arquivo. Você precisa definir um tamanho de lote global no conjunto de dados de validação e definir as validation_steps .

Um conjunto de dados repetido também é recomendado para avaliação.

Como alternativa, você também pode criar outra tarefa que lê periodicamente os pontos de verificação e executa a avaliação. Isso é o que o Estimator faz. Mas esta não é uma forma recomendada de realizar a avaliação e, portanto, seus detalhes são omitidos.

atuação

Agora você tem um modelo Keras que tudo está configurado para executar em vários trabalhadores com a MultiWorkerMirroredStrategy .

Para ajustar o desempenho do treinamento de vários trabalhadores, você pode tentar o seguinte:

  • tf.distribute.MultiWorkerMirroredStrategy fornece várias implementações de comunicação coletiva :

    • RING colectivos baseados no anel implementos usando gRPC como camada de comunicação entre o hospedeiro.
    • NCCL usa a Biblioteca de Comunicação Coletiva NVIDIA para implementar coletivos.
    • AUTO adia a escolha para o tempo de execução.

    A melhor escolha de implementação coletiva depende do número de GPUs, do tipo de GPUs e da interconexão de rede no cluster. Para substituir a escolha automática, especifique o communication_options parâmetro de MultiWorkerMirroredStrategy construtor 's. Por exemplo:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Converter as variáveis para tf.float se possível:

    • O modelo oficial ResNet inclui um exemplo de como isso pode ser feito.

Tolerância ao erro

No treinamento síncrono, o cluster falhará se um dos trabalhadores falhar e nenhum mecanismo de recuperação de falha existir.

Usando Keras com tf.distribute.Strategy vem com a vantagem de tolerância a falhas em casos onde os trabalhadores morrem ou são de outra maneira instável. Você pode fazer isso preservando o estado de treinamento no sistema de arquivos distribuído de sua escolha, de modo que, após a reinicialização da instância que falhou ou interrompeu anteriormente, o estado de treinamento seja recuperado.

Quando um trabalhador fica indisponível, outros trabalhadores falharão (possivelmente após um tempo limite). Nesses casos, o trabalhador indisponível precisa ser reiniciado, assim como outros trabalhadores que falharam.

Callback ModelCheckpoint

ModelCheckpoint callback não fornece funcionalidade de tolerância a falhas, utilize o BackupAndRestore callback vez.

O ModelCheckpoint callback ainda pode ser usado para salvar postos de controle. Mas com isso, se o treinamento foi interrompido ou finalizado com sucesso, para continuar o treinamento a partir do checkpoint, o usuário é responsável por carregar o modelo manualmente.

Opcionalmente, o usuário pode escolher para salvar e restaurar modelo / pesos fora ModelCheckpoint callback.

Salvar e carregar modelo

Para salvar o seu modelo usando model.save ou tf.saved_model.save , as necessidades destino de gravação ser diferente para cada trabalhador.

  • Para trabalhadores não chefes, você precisará salvar o modelo em um diretório temporário.
  • Para o chefe, você precisará salvar no diretório do modelo fornecido.

Os diretórios temporários no trabalhador precisam ser exclusivos para evitar erros resultantes de vários trabalhadores tentando gravar no mesmo local.

O modelo salvo em todos os diretórios é idêntico e, normalmente, apenas o modelo salvo pelo chefe deve ser referenciado para restauração ou serviço.

Você deve ter alguma lógica de limpeza que exclua os diretórios temporários criados pelos trabalhadores assim que seu treinamento for concluído.

A razão para economizar no chefe e nos trabalhadores ao mesmo tempo é porque você pode agregar variáveis ​​durante o checkpoint, o que requer que o chefe e os trabalhadores participem do protocolo de comunicação allreduce. Por outro lado, permitir que o chefe e os trabalhadores salvem no mesmo diretório do modelo resultará em erros devido à contenção.

Usando o MultiWorkerMirroredStrategy , o programa é executado em cada trabalhador, e, a fim de saber se o trabalhador atual é o chefe, ele aproveita o objeto resolvedor cluster que tem atributos task_type e task_id :

  • task_type diz-lhe que o trabalho atual é (por exemplo, 'worker' ).
  • task_id diz-lhe o identificador do trabalhador.
  • O trabalhador com task_id == 0 é designado como o chefe do trabalhador.

No trecho de código abaixo, o write_filepath função fornece o caminho do arquivo de gravação, que depende da do trabalhador task_id :

  • Para o chefe do trabalhador (com task_id == 0 ), ele escreve para o caminho do arquivo original.
  • Para outros trabalhadores, ele cria um diretório-temporário temp_dir -com o task_id no caminho do diretório para escrever em:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Com isso, agora você está pronto para salvar:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Conforme descrito acima, posteriormente o modelo deve ser carregado apenas do caminho onde o chefe salvo, então vamos remover os temporários que os trabalhadores não-chefe salvaram:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Agora, quando é hora de carga, vamos usar conveniente tf.keras.models.load_model API, e continuar com os trabalhos futuros.

Aqui, somente assumem a usar único trabalhador para carga e continuar a formação, caso em que você não chamar tf.keras.models.load_model dentro de outro strategy.scope() (nota que strategy = tf.distribute.MultiWorkerMirroredStrategy() , como definido anteriormente ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

Salvando e restaurando pontos de verificação

Por outro lado, o checkpoint permite que você salve os pesos do seu modelo e os restaure sem ter que salvar todo o modelo.

Aqui, você vai criar um tf.train.Checkpoint que controla o modelo, que é gerido pelo tf.train.CheckpointManager , de modo que apenas o último checkpoint é preservada:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Uma vez que o CheckpointManager está configurado, você está agora pronto para salvar e remover os postos de controle os não-chefe trabalhadores haviam guardado:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Agora, quando você precisar restaurar o modelo, você pode encontrar o mais recente checkpoint salvo usando o conveniente tf.train.latest_checkpoint função. Depois de restaurar o ponto de verificação, você pode continuar com o treinamento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

Callback de BackupAndRestore

O tf.keras.callbacks.experimental.BackupAndRestore callback fornece a funcionalidade de tolerância a falhas de backup do modelo e número de época atual em um arquivo de ponto de verificação temporária sob backup_dir argumento para BackupAndRestore . Isso é feito no final de cada época.

Depois que as tarefas são interrompidas e reiniciadas, o retorno de chamada restaura o último ponto de verificação e o treinamento continua desde o início da época interrompida. Qualquer treinamento parcial já feito na época inacabada antes da interrupção será descartado, de modo que não afete o estado final do modelo.

Para usá-lo, fornecer uma instância de tf.keras.callbacks.experimental.BackupAndRestore no Model.fit chamada.

Com MultiWorkerMirroredStrategy , se um trabalhador for interrompido, todo o cluster faz uma pausa até que o trabalhador interrompido é reiniciado. Outros trabalhos também serão reiniciados e o trabalho interrompido retornará ao cluster. Em seguida, cada trabalhador lê o arquivo de ponto de verificação que foi salvo anteriormente e retoma seu estado anterior, permitindo que o cluster volte a sincronizar. Então, o treinamento continua.

O BackupAndRestore callback usa o CheckpointManager para salvar e restaurar o estado de treinamento, o que gera um arquivo chamado ponto de verificação que faixas existentes checkpoints em conjunto com o mais recente. Por esta razão, backup_dir não deve ser re-utilizado para armazenar outros postos de controle, a fim de evitar a colisão nome.

Atualmente, o BackupAndRestore callback suporta trabalhador solteiro com nenhuma estratégia, MirroredStrategy e multi-trabalhador com MultiWorkerMirroredStrategy. Abaixo estão dois exemplos de treinamento de vários trabalhadores e treinamento de um único trabalhador.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

Se você inspecionar o diretório de backup_dir especificado na BackupAndRestore , você pode perceber alguns arquivos de ponto de verificação gerados temporariamente. Esses arquivos são necessários para recuperar as instâncias anteriormente perdidos, e eles serão removidos pela biblioteca no final do Model.fit Ao sair bem sucedida de sua formação.

Recursos adicionais

  1. A formação Distribuído em TensorFlow guia fornece uma visão geral das estratégias de distribuição disponíveis.
  2. O ciclo de treinamento personalizado com Keras e MultiWorkerMirroredStrategy mostra tutorial como usar o MultiWorkerMirroredStrategy com Keras e um loop de treinamento personalizado.
  3. Confira os modelos oficiais , muitos dos quais podem ser configurados para executar múltiplas estratégias de distribuição.
  4. O desempenho melhor com tf.function guia fornece informações sobre outras estratégias e ferramentas, como o TensorFlow Profiler você pode usar para otimizar o desempenho de seus modelos TensorFlow.