RSVP para seu evento TensorFlow Everywhere hoje mesmo!
Esta página foi traduzida pela API Cloud Translation.
Switch to English

Treinamento de servidor de parâmetros

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

O treinamento do servidor de parâmetros é um método comum de dados paralelos para aumentar o treinamento do modelo em várias máquinas. Um cluster de treinamento de servidor de parâmetro consiste em trabalhadores e servidores de parâmetro. As variáveis ​​são criadas em servidores de parâmetros e são lidas e atualizadas pelos trabalhadores em cada etapa. Por padrão, os trabalhadores leem e atualizam essas variáveis ​​independentemente, sem sincronizar uns com os outros. É por isso que às vezes o treinamento do tipo servidor de parâmetro é chamado de treinamento assíncrono.

O treinamento do servidor de parâmetros do TensorFlow 2 usa um coordenador central por meio da classe tf.distribute.experimental.coordinator.ClusterCoordinator .

Nesta implementação, as tarefas do worker e do parameter server executam tf.distribute.Server s que tf.distribute.Server às solicitações do coordenador. O coordenador cria recursos, despacha tarefas de treinamento, escreve pontos de verificação e lida com falhas de tarefas.

Acreditamos que essa arquitetura e a nova classe ClusterCoordinator fornecem um modelo de programação mais flexível e simples.

ClusterCoordinator

A classe ClusterCoordinator precisa trabalhar em conjunto com um objeto tf.distribute.Strategy . Este objeto tf.distribute.Strategy é necessário para passar as informações do cluster e é usado para definir uma etapa de treinamento, como vimos no treinamento personalizado com MirroredStrategy . O objeto ClusterCoordinator então despacha a execução dessas etapas de treinamento para trabalhadores remotos. Atualmente, o ClusterCoordinator funciona apenas com tf.distribute.experimental.ParameterServerStrategy .

A API mais importante fornecida pelo objeto ClusterCoordinator é a schedule . A API de schedule enfileira um tf.function e retorna um RemoteValue semelhante ao RemoteValue imediatamente. As funções enfileiradas serão despachadas para trabalhadores remotos em threads de fundo e seus RemoteValue s serão preenchidos de forma assíncrona. Como a schedule não requer atribuição de trabalhador, o tf.function transmitido pode ser executado em qualquer trabalhador disponível. Se o trabalhador em que ela é executada ficar indisponível antes de sua conclusão, a função será tentada novamente em outro trabalhador disponível. Devido a esse fato e ao fato de que a execução da função não é atômica, uma função pode ser executada mais de uma vez.

Além de enviar funções remotas, o ClusterCoordinator também ajuda a criar conjuntos de dados em todos os trabalhadores e reconstruir esses conjuntos de dados quando um trabalhador se recupera de uma falha.

Configuração do tutorial

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Configuração de cluster

Conforme mencionado acima, um cluster de treinamento de servidor de parâmetro requer uma tarefa de coordenador que executa seu programa de treinamento, um ou vários workers e tarefas de servidor de parâmetro que executam servidores TensorFlow, ou seja, tf.distribute.Server , e possivelmente uma tarefa de avaliação adicional que executa side-car avaliação (veja a seção de avaliação do carro lateral abaixo). Os requisitos para configurá-los são:

  • A tarefa do coordenador precisa saber os endereços e portas de todos os outros servidores TensorFlow, exceto o avaliador.
  • Os workers e servidores de parâmetros precisam saber em qual porta precisam escutar. Para simplificar, normalmente passamos as informações completas do cluster ao criar servidores TensorFlow nessas tarefas.
  • A tarefa do avaliador não precisa saber a configuração do cluster de treinamento. Em caso afirmativo, ele não deve tentar se conectar ao cluster de treinamento.
  • Trabalhadores e servidores de parâmetros devem ter tipos de tarefas como “trabalhador” e “ps” respectivamente. O coordenador deve usar “chefe” como o tipo de tarefa por motivos de legado.

Neste tutorial, criaremos um cluster em processo para que todo o treinamento do servidor de parâmetros possa ser executado no colab. Apresentaremos como configurar clusters reais em uma seção posterior.

Cluster em processo

Neste tutorial, iniciaremos vários servidores TensorFlow com antecedência e nos conectaremos a eles mais tarde:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Treinamento com loop de treinamento personalizado

O loop de treinamento personalizado com tf.distribute.Strategy oferece grande flexibilidade para definir os loops de treinamento. Atualmente, para o treinamento do servidor de parâmetros no TensorFlow 2, apenas o loop de treinamento personalizado é compatível. Aqui, usamos ParameterServerStrategy para definir uma etapa de treinamento e, em seguida, usamos ClusterCoordinator para despachar a execução das etapas de treinamento para trabalhadores remotos.

Crie o ParameterServerStrategy

Para escrever uma etapa de treinamento no loop de treinamento personalizado, a primeira etapa é criar um ParameterServerStrategy . Explicaremos o variable_partitioner mais tarde.

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

Em seguida, você criará um modelo, definirá um conjunto de dados e uma função de etapa como vimos no loop de treinamento com outros tf.distribute.Strategy s. Você pode encontrar mais detalhes neste tutorial . Vamos criar esses componentes nas seguintes etapas:

Configure os dados

Primeiro, escreva uma função que crie um conjunto de dados que inclui a lógica de pré-processamento implementada pelas camadas de pré-processamento de Keras. dataset_fn essas camadas fora de dataset_fn mas aplicaremos a transformação dentro de dataset_fn pois você envolverá dataset_fn em um tf.function que não permite que variáveis ​​sejam criadas dentro dele.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Gere exemplos de brinquedos em um conjunto de dados:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Em seguida, criamos o conjunto de dados de treinamento envolvido em um dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construir o modelo

Em segundo lugar, criamos o modelo e outros objetos. Certifique-se de criar todas as variáveis ​​em strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Defina a etapa de treinamento

Terceiro, crie a etapa de treinamento envolvida em uma tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Na função da etapa acima, chamar strategy.run e strategy.reduce no step_fn é útil para dar suporte a GPUs ou trabalhador de múltiplas réplicas no futuro, embora tenham uma implementação trivial no momento.

Enviar etapas de treinamento para trabalhadores remotos

Depois que todos os cálculos forem definidos por ParameterServerStrategy , usaremos a classe ClusterCoordinator para criar recursos e distribuir as etapas de treinamento para trabalhadores remotos.

Vamos primeiro criar um objeto ClusterCoordinator e passar o objeto de estratégia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Em seguida, criamos um conjunto de dados por trabalhador e um iterador. No per_worker_dataset_fn abaixo, envolver o dataset_fn em strategy.distribute_datasets_from_function é opcional, mas permitirá o suporte de pré-busca eficiente para GPUs perfeitamente no futuro quando as GPUs forem compatíveis com ParameterServerStrategy .

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

A etapa final é distribuir a computação para funcionários remotos usando o schedule . O método de schedule enfileira um tf.function e retorna um RemoteValue semelhante ao RemoteValue imediatamente. As funções enfileiradas serão despachadas para trabalhadores remotos em threads em segundo plano e o RemoteValue será preenchido de forma assíncrona. O método de join pode ser usado para esperar até que todas as funções programadas sejam executadas.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Aqui está como você pode buscar o resultado de um RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

Como alternativa, você pode iniciar todas as etapas e fazer algo enquanto espera a conclusão:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Para o treinamento completo e fluxo de trabalho de serviço para este exemplo específico, confira este teste .

Mais sobre a criação de conjuntos de dados

O conjunto de dados no código acima é criado usando a API create_per_worker_dataset . Ele cria um conjunto de dados por trabalhador e retorna um objeto recipiente. Você pode chamar o método iter nele para criar um iterador por trabalhador. O iterador por trabalhador contém um iterador por trabalhador e a fatia correspondente de um trabalhador será substituída no argumento de entrada da função passada ao método de schedule antes que a função seja executada em um trabalhador específico.

Atualmente, o método de schedule assume que os trabalhadores são equivalentes e, portanto, assume que os conjuntos de dados em diferentes trabalhadores são os mesmos, exceto que eles podem ser embaralhados de forma diferente se contiverem uma operação dataset.shuffle Por isso, também recomendamos que os conjuntos de dados sejam repetidos indefinidamente e agendem um número finito de etapas, em vez de depender do OutOfRangeError de um conjunto de dados.

Outra observação importante é que os conjuntos de dados tf.data não oferecem suporte à serialização e desserialização implícita entre os limites da tarefa. Portanto, é importante criar todo o conjunto de dados dentro da função passada para create_per_worker_dataset .

Fragmentação de variável

Fragmentação de variável refere-se à divisão de uma variável em várias variáveis ​​menores. Chamamos essas variáveis ​​menores de shard s. A fragmentação de variável pode ser útil para distribuir a carga da rede ao acessar esses fragmentos. Também é útil distribuir computação e armazenamento de uma variável normal em vários servidores de parâmetros.

Para ativar o sharding de variável, você pode passar um variable_partitioner ao construir um objeto ParameterServerStrategy . O variable_partitioner será chamado sempre que uma variável for criada e espera-se que retorne o número de fragmentos ao longo de cada dimensão da variável. Alguns variable_partitioner s tf.distribute.experimental.partitioners.FixedShardsPartitioner para tf.distribute.experimental.partitioners.FixedShardsPartitioner , como tf.distribute.experimental.partitioners.FixedShardsPartitioner .

No exemplo acima, usamos o FixedShardsPartitioner que dividirá todas as variáveis ​​em dois fragmentos e cada fragmento será atribuído a diferentes servidores de parâmetros:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Quando um variable_partitioner é passado e se você criar uma variável diretamente em strategy.scope() , ele se tornará um tipo de contêiner com uma propriedade de variables que fornece acesso à lista de fragmentos. Na maioria dos casos, esse contêiner será automaticamente convertido em um Tensor concatenando todos os fragmentos. Como resultado, ele pode ser usado como uma variável normal. Por outro lado, alguns métodos do TensorFlow, como tf.nn.embedding_lookup fornecem implementação eficiente para esse tipo de contêiner e, nesses métodos, a concatenação automática será evitada.

Consulte a docstring API de ParameterServerStrategy para obter mais detalhes.

Avaliação

Há mais de uma maneira de definir e executar um loop de avaliação no treinamento distribuído. Cada um tem seus prós e contras, conforme descrito a seguir. O método de avaliação em linha é recomendado se você não tiver uma preferência.

Avaliação inline

Neste método o coordenador alterna entre treinamento e avaliação e, portanto, chamamos de avaliação inline. Existem vários benefícios da avaliação em linha. Por exemplo, pode suportar grandes modelos de avaliação e conjuntos de dados de avaliação que uma única tarefa não pode conter. Para outro exemplo, os resultados da avaliação podem ser usados ​​para tomar decisões para o treinamento da próxima época.

Existem duas maneiras de implementar a avaliação inline:

  • Avaliação direta - Para pequenos modelos e conjuntos de dados de avaliação, o coordenador pode executar a avaliação diretamente no modelo distribuído com o conjunto de dados de avaliação no coordenador:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

  • Avaliação distribuída - Para grandes modelos ou conjuntos de dados que são inviáveis ​​para executar diretamente no coordenador, a tarefa do coordenador pode distribuir tarefas de avaliação para os trabalhadores por meio dos métodos de schedule / join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Avaliação do carro lateral

Outro método é chamado de avaliação de carro lateral, que consiste em criar uma tarefa de avaliador dedicada que lê repetidamente os pontos de verificação e executa a avaliação em um ponto de verificação mais recente. Ele permite que seu programa de treinamento termine mais cedo se você não precisar alterar o ciclo de treinamento com base nos resultados da avaliação. No entanto, requer uma tarefa de avaliador adicional e pontos de verificação periódicos para acionar a avaliação. A seguir está um possível ciclo de avaliação do side-car:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clusters no mundo real

Em um ambiente de produção real, você executará todas as tarefas em diferentes processos em diferentes máquinas. A maneira mais simples de configurar as informações do cluster em cada tarefa é definir as variáveis ​​de ambiente "TF_CONFIG" e usar TFConfigClusterResolver para analisar "TF_CONFIG". Para obter uma descrição geral sobre as variáveis ​​de ambiente "TF_CONFIG", consulte o guia de treinamento distribuído .

Se você iniciar suas tarefas de treinamento usando Kubernetes ou outros modelos de configuração, é muito provável que esses modelos já tenham definido “TF_CONFIG” para você.

Defina a variável de ambiente “TF_CONFIG”

Suponha que você tenha 3 trabalhadores e 2 servidores de parâmetros, o “TF_CONFIG” do trabalhador 1 pode ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

O “TF_CONFIG” do avaliador pode ser:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

A parte “cluster” na string “TF_CONFIG” acima para o avaliador é opcional.

Se você usar o mesmo binário para todas as tarefas

Se preferir executar todas essas tarefas usando um único binário, você precisará permitir que seu programa se ramifique em funções diferentes logo no início:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

O código a seguir inicia um servidor TensorFlow e espera:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Lidando com Falha de Tarefa

Falha do trabalhador

Conforme mencionado acima, o ClusterCoordinator tem tolerância a falhas embutida para falhas do trabalhador. Após a recuperação do trabalhador, a fatia correspondente dos conjuntos de dados criados por create_per_worker_dataset que ainda estão no escopo será recriada invocando seu dataset_fn original passado para create_per_worker_dataset .

Servidor de parâmetro ou falha do coordenador

No entanto, quando o coordenador vê um erro de servidor de parâmetro, ele lançará um UnavailableError ou AbortedError imediatamente. Você pode reiniciar o coordenador neste caso. O próprio coordenador também pode ficar indisponível. Portanto, para não perder muito do progresso do treinamento, é importante verificar as variáveis ​​do modelo periodicamente e carregar as variáveis ​​do modelo em um ponto de verificação, se houver, antes do início do treinamento. O progresso do treinamento pode ser inferido aproximadamente a partir de optimizer.iterations se um otimizador for verificado.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Buscando um RemoteValue

A obtenção de um RemoteValue sucesso garantido se uma função for executada com êxito. Isso ocorre porque atualmente o valor de retorno é imediatamente copiado para o coordenador após a execução de uma função. Se houver alguma falha do trabalhador durante a cópia, a função será repetida em outro trabalhador disponível. Portanto, se você deseja otimizar o desempenho, pode agendar funções sem um valor de retorno.

Relatório de erros

Quando o coordenador vê um erro como UnavailableError de servidores de parâmetros ou outros erros de aplicativo, como InvalidArgument de tf.debugging.check_numerics , ele cancelará todas as funções pendentes e enfileiradas antes de gerar o erro. Buscar seus RemoteValue s correspondentes RemoteValue um CancelledError .

Depois que um erro é gerado, o coordenador não levantará o mesmo erro ou qualquer erro de funções canceladas.

Melhoria de desempenho

Existem vários motivos possíveis se você vir problemas de desempenho ao treinar com ParameterServerStrategy e ClusterResolver .

Um motivo comum é que os servidores de parâmetros têm carga desequilibrada e alguns servidores de parâmetros muito carregados atingiram a capacidade. Também pode haver várias causas raiz. Alguns métodos simples para atenuar esse problema são:

  1. fragmentar suas grandes variáveis ​​de modelo especificando um variable_partitioner ao construir um ParameterServerStrategy .
  2. evite criar uma variável de ponto de acesso exigida por todos os servidores de parâmetros em uma única etapa, se possível. Por exemplo, use uma taxa de aprendizado constante ou subclasse tf.keras.optimizers.schedules.LearningRateSchedule em otimizadores, pois o comportamento padrão é que a taxa de aprendizado se tornará uma variável colocada em um servidor de parâmetro específico e solicitada por todos os outros servidores de parâmetro em cada etapa .
  3. embaralhe seus grandes vocabulários antes de passá-los para as camadas de pré-processamento Keras.

Outro possível motivo para problemas de desempenho é o coordenador. Nossa primeira implementação de schedule / join é baseada em Python e, portanto, pode ter sobrecarga de threading. Além disso, a latência entre o coordenador e os trabalhadores pode ser grande. Se for esse o caso, você pode compactar várias etapas em uma única tf.function :

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Continuaremos otimizando o coordenador e, esperançosamente, a maioria dos usuários não terá que empacotar manualmente as etapas no futuro.

Além disso, um pequeno truque para melhoria de desempenho é agendar funções sem um valor de retorno, conforme explicado na seção de tratamento de falha de tarefa acima.

Limitações Conhecidas

A maioria das limitações conhecidas é abordada nas seções acima. Aqui está um resumo:

  • os.environment["grpc_fail_fast"]="use_caller" é necessário em todas as tarefas, incluindo o coordenador, para fazer a tolerância a falhas funcionar corretamente.
  • Trabalhadores da GPU não são compatíveis.
  • O treinamento do servidor de parâmetro síncrono não é compatível.
  • ParameterServerStrategy não funciona com Keras compile e fit APIs.
  • ClusterCoordinator.schedule não oferece suporte a garantias de visitação para um conjunto de dados.
  • Quando ClusterCoordinator.create_per_worker_dataset é usado, todo o conjunto de dados deve ser criado dentro da função passada a ele.
  • Geralmente, é necessário agrupar várias etapas em uma única função para obter o desempenho ideal.
  • Não há suporte para carregar um modelo saved por meio de tf.saved_model.load contendo variáveis ​​fragmentadas. Espera-se que o carregamento de um modelo saved_model usando o TensorFlow Serving funcione.
  • Não é suportado carregar um ponto de verificação contendo variáveis ​​de slot do otimizador sharded em um número diferente de shards.
  • Não há suporte para a recuperação de falha do servidor de parâmetro sem reiniciar a tarefa do coordenador.