Treinamento de servidor de parâmetros com ParameterServerStrategy

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

Formação servidor parâmetro é um método de dados em paralelo comum para intensificar a formação modelo em várias máquinas.

Um cluster formação servidor parâmetro consiste de trabalhadores e servidores de parâmetro. As variáveis ​​são criadas em servidores de parâmetros e são lidas e atualizadas pelos trabalhadores em cada etapa. Por padrão, os trabalhadores leem e atualizam essas variáveis ​​independentemente, sem sincronizar uns com os outros. É por isso que às vezes parâmetro formação de estilo servidor é chamado de treinamento assíncrono.

Em TensorFlow 2, formação servidor parâmetro é alimentado pelo tf.distribute.experimental.ParameterServerStrategy classe, que distribui os passos de formação de um cluster que escalar até milhares de trabalhadores (acompanhado por servidores de parâmetro).

Métodos de treinamento com suporte

Existem dois métodos principais de treinamento:

Um cluster com trabalhos e tarefas

Independentemente da API de escolha ( Model.fit ou um ciclo de treinamento personalizado), o treinamento distribuídos em TensorFlow 2 envolve: um 'cluster' com vários 'jobs' , e cada um dos postos de trabalho pode ter um ou mais 'tasks' .

Ao usar o treinamento do servidor de parâmetros, é recomendado ter:

  • Um trabalho do coordenador (que tem o nome do trabalho chief )
  • Vários trabalhos de trabalho (trabalho nome worker ); e
  • Vários trabalhos do servidor parâmetro (nome do trabalho ps )

Enquanto o coordenador cria recursos, despachos formação tarefas, escreve checkpoints, e lida com falhas de tarefa, os trabalhadores e servidores de parâmetros executar tf.distribute.Server que escutar as solicitações do coordenador.

Formação servidor parâmetro com Model.fit API

Formação servidor parâmetro com o Model.fit API requer o coordenador de usar um tf.distribute.experimental.ParameterServerStrategy objeto, e uma tf.keras.utils.experimental.DatasetCreator como entrada. Semelhante ao Model.fit uso sem estratégia, ou com outras estratégias, o fluxo de trabalho envolve a criação e compilar o modelo, preparando os retornos de chamada, seguido por um Model.fit chamada.

Treinamento de servidor de parâmetros com um loop de treinamento personalizado

Com loops de treinamento personalizados, o tf.distribute.experimental.coordinator.ClusterCoordinator classe é o componente chave utilizada para o coordenador.

A API mais importante fornecido pelo ClusterCoordinator objeto é schedule :

  • A schedule API enfileira um tf.function e retorna um futuro semelhante RemoteValue imediatamente.
  • As funções na fila serão enviados para os trabalhadores remotos em threads em segundo plano e sua RemoteValue s será preenchido de forma assíncrona.
  • Desde schedule não requer atribuição trabalhador, o tf.function passou no pode ser executado em qualquer trabalhador disponível.
  • Se o trabalhador em que ela é executada ficar indisponível antes de sua conclusão, a função será tentada novamente em outro trabalhador disponível.
  • Devido a esse fato e ao fato de que a execução da função não é atômica, uma função pode ser executada mais de uma vez.

Além de despachar funções remotas, o ClusterCoordinator também ajuda a criar conjuntos de dados sobre todos os trabalhadores e reconstruir esses conjuntos de dados Quando um trabalhador se recupera de fracasso.

Configuração do tutorial

O tutorial vai ramificar em Model.fit e personalizados caminhos de circuito de treinamento, e você pode escolher aquele que se adapta às suas necessidades. Seções diferentes de "Treinamento com X" são aplicáveis ​​a ambos os caminhos.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

Configuração de cluster

Como mencionado acima, um cluster formação servidor parâmetro requer uma tarefa coordenador que executa o seu programa de treinamento, um ou vários trabalhadores e tarefas do servidor de parâmetros que executar TensorFlow Servidores- tf.distribute.Server -e possivelmente uma tarefa de avaliação adicional que a avaliação é executada side-car (veja a seção de avaliação do carro lateral abaixo). Os requisitos para configurá-los são:

  • A tarefa do coordenador precisa saber os endereços e portas de todos os outros servidores TensorFlow, exceto o avaliador.
  • Os workers e servidores de parâmetros precisam saber em qual porta precisam escutar. Para simplificar, geralmente você pode passar as informações completas do cluster ao criar servidores TensorFlow nessas tarefas.
  • A tarefa do avaliador não precisa saber a configuração do cluster de treinamento. Em caso afirmativo, ele não deve tentar se conectar ao cluster de treinamento.
  • Trabalhadores e servidores de parâmetros devem ter tipos de tarefas como "worker" e "ps" , respectivamente. O coordenador deve usar "chief" como o tipo de tarefa por razões de legado.

Neste tutorial, você criará um cluster em processo para que todo o treinamento do servidor de parâmetros possa ser executado no Colab. Você vai aprender como configurar os clusters reais em uma seção posterior.

Cluster em processo

Você começará criando vários servidores TensorFlow com antecedência e se conectará a eles mais tarde. Note-se que este é apenas para fins de demonstração deste tutorial, e no treinamento real dos servidores será iniciada "worker" e "ps" máquinas.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

A configuração de cluster em-processo é frequentemente utilizado em testes de unidade, tal como aqui .

Outra opção para os testes locais é lançar processos na máquina-check locais fora formação Multi-trabalhador com Keras para um exemplo desta abordagem.

Instancie um ParameterServerStrategy

Antes de mergulhar no código formação, vamos instanciar um ParameterServerStrategy objeto. Note que isto é necessário, independentemente de você estão a prosseguir com Model.fit ou um ciclo de treinamento personalizado. O variable_partitioner argumento será explicado na seção de fragmentação variável .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

Para usar GPUs para treinamento, aloque GPUs visíveis para cada trabalhador. ParameterServerStrategy usará todas as GPUs disponíveis em cada trabalhador, com a restrição de que todos os trabalhadores devem ter o mesmo número de GPUs disponíveis.

Fragmentação de variável

Fragmentação variável refere-se a divisão de uma variável em múltiplas variáveis menores, que são chamados fragmentos. A fragmentação de variável pode ser útil para distribuir a carga da rede ao acessar esses fragmentos. Também é útil distribuir computação e armazenamento de uma variável normal em vários servidores de parâmetros.

Para habilitar fragmentação variável, você pode passar em um variable_partitioner ao construir uma ParameterServerStrategy objeto. O variable_partitioner será chamado cada vez que quando uma variável é criado e é esperado para retornar o número de fragmentos ao longo de cada dimensão da variável. Alguns out-of-box variable_partitioner s são fornecidos como tf.distribute.experimental.partitioners.MinSizePartitioner . Recomenda-se usar partitioners baseada em tamanho como tf.distribute.experimental.partitioners.MinSizePartitioner para evitar o particionamento pequenas variáveis, que podem ter impacto negativo sobre a velocidade de treinamento do modelo.

Quando um variable_partitioner é passado e se você criar uma variável diretamente sob strategy.scope() , ele se tornará um tipo de recipiente com uma variables propriedade que dá acesso à lista de cacos. Na maioria dos casos, esse contêiner será convertido automaticamente em um Tensor concatenando todos os fragmentos. Como resultado, ele pode ser usado como uma variável normal. Por outro lado, alguns métodos TensorFlow tais como tf.nn.embedding_lookup fornecer implementação eficiente para este tipo de recipiente e nestes métodos concatenação automática vai ser evitados.

Consulte a documentação de API de tf.distribute.experimental.ParameterServerStrategy para mais detalhes.

Treinando com Model.fit

Keras fornece uma API formação easy-to-use através Model.fit que manipula o loop de formação sob o capô, com a flexibilidade de substituível train_step , e callbacks, que fornecem funcionalidades, tais como a poupança checkpoint ou resumo poupança para TensorBoard. Com Model.fit , o mesmo código de treinamento pode ser usado para outras estratégias com uma simples troca do objeto estratégia.

Dados de entrada

Model.fit com formação servidor parâmetro requer que os dados de entrada ser fornecidos em uma função que recebe um único argumento do tipo tf.distribute.InputContext , e retorna um tf.data.Dataset . Em seguida, crie um tf.keras.utils.experimental.DatasetCreator objeto que leva tal callable , e um opcional tf.distribute.InputOptions objeto via input_options argumento.

Note que é recomendado para embaralhar e repetir os dados com formação servidor parâmetro e especificar steps_per_epoch no fit chamada para a biblioteca conhece os limites marcaram época.

Consulte a entrada Distribuído tutorial para mais informações sobre o InputContext argumento.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

O código na dataset_fn será invocado no dispositivo de entrada, que é geralmente o CPU, em cada uma das máquinas de trabalho.

Construção e compilação do modelo

Agora, você vai criar um tf.keras.Model trivial -a tf.keras.models.Sequential modelo para fins de demonstração seguidas por um Model.compile chamada para incorporar componentes, como um otimizador, métricas, ou parâmetros, tais como steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Chamadas de retorno e treinamento

Antes de chamar model.fit para o treinamento real, vamos preparar os retornos necessários para tarefas comuns, tais como:

  • ModelCheckpoint : para salvar os pesos modelo.
  • BackupAndRestore : para garantir que o progresso do treinamento é feito o backup automático, e recuperado se a experiências de fragmentação indisponibilidade (como abort ou preempção); ou
  • TensorBoard : para salvar os relatórios de progresso em arquivos de resumo, que se visualizadas na ferramenta TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

Uso direto com ClusterCoordinator (opcional)

Mesmo se você escolher o Model.fit percurso de formação, você pode, opcionalmente, instanciar um tf.distribute.experimental.coordinator.ClusterCoordinator objeto para agendar outras funções que você gostaria de ser executado sobre os trabalhadores. Veja o treino com um circuito de treinamento personalizado seção para mais detalhes e exemplos.

Treinamento com um loop de treinamento personalizado

Usando loops de treinamento personalizado com tf.distribute.Strategy oferece uma grande flexibilidade para definir ciclos de treinamento. Com a ParameterServerStrategy definido acima (como strategy ), você vai usar um tf.distribute.experimental.coordinator.ClusterCoordinator para despachar a execução de etapas de treinamento para trabalhadores remotos.

Então, você vai criar um modelo, definir um conjunto de dados e uma função de etapa, como você tem feito no circuito de treinamento com outros tf.distribute.Strategy s. Você pode encontrar mais detalhes no treinamento personalizado com tf.distribute.Strategy tutorial.

Para garantir prefetching conjunto de dados eficiente, use o recomendado distribuído conjunto de dados APIs de criação mencionados nas etapas de treinamento expedição para trabalhadores remotos seção abaixo. Além disso, certifique-se chamar Strategy.run dentro worker_fn para tirar o máximo proveito de GPUs alocados aos trabalhadores. O restante das etapas é o mesmo para treinamento com ou sem GPUs.

Vamos criar esses componentes nas seguintes etapas:

Configure os dados

Primeiro, escreva uma função que cria um conjunto de dados que inclui o pré-processamento lógica implementada por camadas de pré-processamento Keras .

Você vai criar essas camadas fora da dataset_fn mas aplicar a transformação dentro da dataset_fn , já que você vai embrulhar o dataset_fn em um tf.function , que não permite que as variáveis a ser criado dentro dela.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Gere exemplos de brinquedos em um conjunto de dados:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Em seguida, crie o conjunto de dados de treinamento envolvido em um dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construir o modelo

Em seguida, crie o modelo e outros objetos. Certifique-se de criar todas as variáveis sob strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Vamos confirmar que o uso de FixedShardsPartitioner dividir todas as variáveis em dois fragmentos e cada fragmento foi designado para servidores diferentes parâmetros:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Defina a etapa de treinamento

Em terceiro lugar, criar a etapa de formação envolveu em um tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Na função etapa de treinamento acima, chamando Strategy.run e Strategy.reduce na step_fn pode suportar múltiplos GPUs por trabalhador. Se os trabalhadores têm GPUs alocado, Strategy.run vai distribuir os conjuntos de dados em várias réplicas.

Despache etapas de treinamento para trabalhadores remotos

Depois de todos os cálculos são definidos por ParameterServerStrategy , você vai usar o tf.distribute.experimental.coordinator.ClusterCoordinator classe para criar recursos e distribuir as etapas de treinamento para trabalhadores remotos.

Vamos primeiro criar um ClusterCoordinator objeto e passe o objeto de estratégia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Em seguida, crie um conjunto de dados por trabalhador e um iterador. Nos per_worker_dataset_fn abaixo, envolvendo o dataset_fn em strategy.distribute_datasets_from_function é recomendado para permitir a pré-busca eficiente para GPUs perfeitamente.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

O passo final é a de distribuir a computação aos trabalhadores remotos usando ClusterCoordinator.schedule :

  • O schedule método enfileira um tf.function e retorna um futuro semelhante RemoteValue imediatamente. As funções na fila serão enviados para os trabalhadores remotos em threads em segundo plano eo RemoteValue será preenchido de forma assíncrona.
  • A join método ( ClusterCoordinator.join ) pode ser usado para esperar até que todas as funções programadas são executados.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Aqui está como você pode buscar o resultado de uma RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Como alternativa, você pode iniciar todas as etapas e fazer algo enquanto espera a conclusão:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Para o treinamento completo e servindo de fluxo de trabalho para este exemplo particular, por favor, confira este teste .

Mais sobre a criação de conjuntos de dados

O conjunto de dados no código acima é criado usando o ClusterCoordinator.create_per_worker_dataset API). Ele cria um conjunto de dados por trabalhador e retorna um objeto de contêiner. Você pode chamar o iter método nele para criar um iterador por trabalhador. O per-trabalhador iteração contém uma iteração por trabalhador e a fatia correspondente de um trabalhador irá ser substituído no argumento de entrada da função passado para o ClusterCoordinator.schedule método antes da função é executado em um determinado trabalhador.

Actualmente, o ClusterCoordinator.schedule método assume trabalhadores são equivalentes e, portanto, assume os conjuntos de dados sobre diferentes trabalhadores são os mesmos, excepto que eles podem ser baralhado de forma diferente se contiverem um Dataset.shuffle operação. Devido a isso, também é recomendado que os conjuntos de dados a ser repetido indefinidamente e você agendar um número finito de passos em vez de confiar na OutOfRangeError a partir de um conjunto de dados.

Outra observação importante é que tf.data conjuntos de dados não suportam serialização implícita e desserialização além das fronteiras da tarefa. Por isso, é importante criar todo o conjunto de dados dentro da função passado para ClusterCoordinator.create_per_worker_dataset .

Avaliação

Há mais de uma maneira de definir e executar um loop de avaliação no treinamento distribuído. Cada um tem seus prós e contras, conforme descrito a seguir. O método de avaliação em linha é recomendado se você não tiver uma preferência.

Avaliação inline

Neste método, os suplentes coordenador entre formação e avaliação e, portanto, ele é chamado ele de avaliação em linha.

Existem vários benefícios da avaliação em linha. Por exemplo:

  • Ele pode oferecer suporte a grandes modelos de avaliação e conjuntos de dados de avaliação que uma única tarefa não pode conter.
  • Os resultados da avaliação podem ser usados ​​para tomar decisões para o treinamento da próxima época.

Existem duas maneiras de implementar a avaliação inline: avaliação direta e avaliação distribuída.

  • Avaliação direta: Para pequenos modelos e conjuntos de dados de avaliação, o coordenador pode executar a avaliação diretamente no modelo distribuído com o conjunto de dados de avaliação do coordenador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Avaliação distribuída: Para grandes modelos ou conjuntos de dados que são inviáveis para executar diretamente sobre o coordenador, a tarefa coordenador pode distribuir tarefas de avaliação para os trabalhadores através dos ClusterCoordinator.schedule / ClusterCoordinator.join métodos:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Avaliação do carro lateral

Outro método é chamado de avaliação side-car onde você cria uma tarefa avaliador dedicado que repetidamente lê checkpoints e corre de avaliação sobre a mais recente checkpoint. Ele permite que seu programa de treinamento termine mais cedo se você não precisar alterar seu ciclo de treinamento com base nos resultados da avaliação. No entanto, requer uma tarefa de avaliador adicional e pontos de verificação periódicos para acionar a avaliação. A seguir está um possível ciclo de avaliação do side-car:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clusters no mundo real

Em um ambiente de produção real, você executará todas as tarefas em diferentes processos em diferentes máquinas. A maneira mais simples de informações de cluster configure em cada tarefa é definir "TF_CONFIG" variáveis de ambiente e usar um tf.distribute.cluster_resolver.TFConfigClusterResolver para analisar "TF_CONFIG" .

Para uma descrição geral sobre "TF_CONFIG" variáveis de ambiente, consulte o treinamento Distribuído guia.

Se você começar suas tarefas de treinamento usando Kubernetes ou outros modelos de configuração, é muito provável que esses modelos já definiu “TF_CONFIG" para você.

Defina o "TF_CONFIG" variável de ambiente

Suponha que você tem 3 trabalhadores e 2 servidores de parâmetros, o "TF_CONFIG" do trabalhador 1 podem ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

O "TF_CONFIG" do avaliador pode ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

O "cluster" parte na acima "TF_CONFIG" string para o avaliador é opcional.

Se você usar o mesmo binário para todas as tarefas

Se preferir executar todas essas tarefas usando um único binário, você precisará permitir que seu programa se ramifique em funções diferentes logo no início:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

O código a seguir inicia um servidor TensorFlow e espera:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Lidando com falha de tarefa

Falha do trabalhador

tf.distribute.experimental.coordinator.ClusterCoordinator ou Model.fit fornecem built-in tolerância a falhas para o fracasso do trabalhador. Após a recuperação do trabalhador, a função anteriormente fornecidas conjunto de dados (tanto para ClusterCoordinator.create_per_worker_dataset para um loop de treinamento personalizado ou tf.keras.utils.experimental.DatasetCreator para Model.fit ) será invocado sobre os trabalhadores para re-criar os conjuntos de dados.

Servidor de parâmetro ou falha do coordenador

No entanto, quando o coordenador vê um erro no servidor parâmetro, ele irá gerar um UnavailableError ou AbortedError imediatamente. Você pode reiniciar o coordenador neste caso. O próprio coordenador também pode ficar indisponível. Portanto, certas ferramentas são recomendadas para não perder o progresso do treinamento:

  • Para Model.fit , você deve usar um BackupAndRestore callback, que lida com a economia de progresso e de restauração automaticamente. Veja Callbacks e formação seção acima para um exemplo.

  • Para um loop de treinamento personalizado, você deve verificar as variáveis ​​do modelo periodicamente e carregar as variáveis ​​do modelo a partir de um ponto de verificação, se houver, antes do início do treinamento. O progresso do treinamento pode ser inferida a partir de aproximadamente optimizer.iterations se um otimizador é checkpointed:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Buscando um RemoteValue

Buscando um RemoteValue é garantia de sucesso se uma função é executada com sucesso. Isso ocorre porque atualmente o valor de retorno é imediatamente copiado para o coordenador após a execução de uma função. Se houver alguma falha do trabalhador durante a cópia, a função será tentada novamente em outro trabalhador disponível. Portanto, se você deseja otimizar o desempenho, pode agendar funções sem um valor de retorno.

Relatório de erros

Uma vez que o coordenador vê um erro, como UnavailableError a partir de servidores de parâmetros ou outros erros de aplicação, tais como um InvalidArgument de tf.debugging.check_numerics , ele irá cancelar todas as funções pendentes e em fila antes de levantar o erro. A obtenção do seu correspondente RemoteValue s vai elevar um CancelledError .

Depois que um erro é gerado, o coordenador não levantará o mesmo erro ou qualquer erro de funções canceladas.

Melhoria de desempenho

Há várias razões possíveis, se você ver problemas de desempenho quando você treina com ParameterServerStrategy e ClusterResolver .

Um motivo comum é que os servidores de parâmetros têm carga desequilibrada e alguns servidores de parâmetros muito carregados atingiram a capacidade. Também pode haver várias causas raiz. Alguns métodos simples para atenuar esse problema são:

  1. Shard seus grandes variáveis do modelo via especificando um variable_partitioner ao construir uma ParameterServerStrategy .
  2. Evite criar uma variável de ponto de acesso exigida por todos os servidores de parâmetros em uma única etapa, se possível. Por exemplo, use uma taxa constante aprendizado ou subclasse tf.keras.optimizers.schedules.LearningRateSchedule em otimizadores desde o comportamento padrão é que a taxa de aprendizagem se torne uma variável colocado em um servidor determinado parâmetro e solicitado por todos os outros servidores de parâmetros em cada etapa .
  3. Misture seus grandes vocabulários antes de passá-los para as camadas de pré-processamento do Keras.

Outra possível razão para problemas de desempenho é o coordenador. Sua primeira implementação da schedule / join é baseada em Python e, portanto, pode ter enfiar sobrecarga. Além disso, a latência entre o coordenador e os trabalhadores pode ser grande. Se esse é o caso,

  • Para Model.fit , você pode definir steps_per_execution argumentação, desde a Model.compile para um valor maior que 1.

  • Para um loop de treinamento personalizado, você pode embalar várias etapas em um único tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Como a biblioteca é otimizada ainda mais, a maioria dos usuários não terá que empacotar manualmente as etapas no futuro.

Além disso, um pequeno truque para melhoria de desempenho é agendar funções sem um valor de retorno, conforme explicado na seção de manipulação de falha de tarefa acima.

Limitações conhecidas

A maioria das limitações conhecidas já foi abordada nas seções acima. Esta seção fornece um resumo.

ParameterServerStrategy geral

  • os.environment["grpc_fail_fast"]="use_caller" é necessário em todas as tarefas, incluindo o coordenador, para fazer culpa trabalho tolerância corretamente.
  • O treinamento do servidor de parâmetros síncronos não é compatível.
  • Geralmente, é necessário agrupar várias etapas em uma única função para obter o desempenho ideal.
  • Ele não é suportada para carregar um saved_model através tf.saved_model.load contendo variáveis Sharded. Espera-se que o carregamento de um modelo saved_model usando o TensorFlow Serving funcione.
  • Não há suporte para carregar um ponto de verificação contendo variáveis ​​de slot do otimizador fragmentado em um número diferente de fragmentos.
  • Não há suporte para a recuperação de falha do servidor de parâmetro sem reiniciar a tarefa do coordenador.
  • Uso de tf.lookup.StaticHashTable (que é comumente empregado por alguns tf.keras.layers.experimental.preprocessing camadas, como IntegerLookup , StringLookup e TextVectorization ) resulta em recursos colocados no coordenador neste momento com formação servidor parâmetro. Isso tem implicações de desempenho para RPCs de pesquisa de trabalhadores para o coordenador. Esta é uma prioridade alta atual para abordar.

Model.fit especificidades

  • steps_per_epoch argumento é obrigatório em Model.fit . Você pode selecionar um valor que forneça intervalos apropriados em uma época.
  • ParameterServerStrategy não tem suporte para retornos de chamada personalizados que têm chamadas de nível de lote por motivos de desempenho. Você deve converter essas chamadas em chamadas de nível época com adequadamente escolhido steps_per_epoch , de modo que eles são chamados todos os steps_per_epoch número de passos. Os retornos de chamada integrados não são afetados: suas chamadas em nível de lote foram modificadas para ter um bom desempenho. Apoiar chamadas de nível de lote para ParameterServerStrategy está sendo planejado.
  • Pelo mesmo motivo, ao contrário de outras estratégias, a barra de progresso e as métricas são registradas apenas nos limites de época.
  • run_eagerly não é suportado.

Especificações do loop de treinamento personalizado