¡Reserva! Google I / O regresa del 18 al 20 de mayo Regístrese ahora
Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Capacitación de varios trabajadores con Estimator

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Visión general

Este tutorial demuestra cómo se puede utilizar tf.distribute.Strategy para la formación distribuida de varios trabajadores con tf.estimator . Si escribe su código usando tf.estimator y está interesado en escalar más allá de una sola máquina con alto rendimiento, este tutorial es para usted.

Antes de comenzar, lea la guía de estrategia de distribución . El tutorial de entrenamiento de múltiples GPU también es relevante, porque este tutorial usa el mismo modelo.

Configuración

Primero, configure TensorFlow y las importaciones necesarias.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Función de entrada

Este instructivo usa el conjunto de datos MNIST de TensorFlow Datasets . El código aquí es similar al tutorial de capacitación de múltiples GPU con una diferencia clave: cuando se usa Estimator para la capacitación de múltiples trabajadores, es necesario dividir el conjunto de datos por la cantidad de trabajadores para garantizar la convergencia del modelo. Los datos de entrada se fragmentan por índice de trabajador, de modo que cada trabajador procesa 1/num_workers porciones distintas del conjunto de datos.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Otro enfoque razonable para lograr la convergencia sería mezclar el conjunto de datos con semillas distintas en cada trabajador.

Configuración de varios trabajadores

Una de las diferencias clave en este tutorial (en comparación con el tutorial de formación de varias GPU ) es la configuración de varios trabajadores. La variable de entorno TF_CONFIG es la forma estándar de especificar la configuración del clúster para cada trabajador que forma parte del clúster.

Hay dos componentes de TF_CONFIG : cluster y task . cluster proporciona información sobre todo el clúster, es decir, los trabajadores y los servidores de parámetros del clúster. task proporciona información sobre la tarea actual. El primer cluster componentes es el mismo para todos los trabajadores y servidores de parámetros del grupo, y la segunda task componentes es diferente en cada trabajador y servidor de parámetros y especifica su propio type e index . En este ejemplo, el type tarea es worker y el index tarea es 0 .

Con fines ilustrativos, este tutorial muestra cómo configurar un TF_CONFIG con 2 trabajadores en localhost . En la práctica, crearía varios trabajadores en una dirección IP y un puerto externos, y establecería TF_CONFIG en cada trabajador de forma adecuada, es decir, modificaría el index tareas.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Definir el modelo

Escribe las capas, el optimizador y la función de pérdida para el entrenamiento. Este tutorial define el modelo con capas de Keras, similar al tutorial de entrenamiento multi-GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Para entrenar el modelo, use una instancia de tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy crea copias de todas las variables en las capas del modelo en cada dispositivo en todos los trabajadores. Utiliza CollectiveOps , una operación de TensorFlow para la comunicación colectiva, para agregar gradientes y mantener las variables sincronizadas. La guía tf.distribute.Strategy tiene más detalles sobre esta estrategia.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From <ipython-input-1-f1f424df316e>:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Entrenar y evaluar el modelo

A continuación, especifique la estrategia de distribución en RunConfig para el estimador, y entrene y evalúe invocando tf.estimator.train_and_evaluate . Este tutorial distribuye solo la capacitación especificando la estrategia a través de train_distribute . También es posible distribuir la evaluación a través de eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7fcb555b1240>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 2.2785115, step = 0
INFO:tensorflow:loss = 2.2785115, step = 0
INFO:tensorflow:global_step/sec: 211.696
INFO:tensorflow:global_step/sec: 211.696
INFO:tensorflow:loss = 2.2806711, step = 100 (0.474 sec)
INFO:tensorflow:loss = 2.2806711, step = 100 (0.474 sec)
INFO:tensorflow:global_step/sec: 234.577
INFO:tensorflow:global_step/sec: 234.577
INFO:tensorflow:loss = 2.2909079, step = 200 (0.426 sec)
INFO:tensorflow:loss = 2.2909079, step = 200 (0.426 sec)
INFO:tensorflow:global_step/sec: 235.364
INFO:tensorflow:global_step/sec: 235.364
INFO:tensorflow:loss = 2.267106, step = 300 (0.425 sec)
INFO:tensorflow:loss = 2.267106, step = 300 (0.425 sec)
INFO:tensorflow:global_step/sec: 239.335
INFO:tensorflow:global_step/sec: 239.335
INFO:tensorflow:loss = 2.3009024, step = 400 (0.420 sec)
INFO:tensorflow:loss = 2.3009024, step = 400 (0.420 sec)
INFO:tensorflow:global_step/sec: 236.559
INFO:tensorflow:global_step/sec: 236.559
INFO:tensorflow:loss = 2.2788124, step = 500 (0.422 sec)
INFO:tensorflow:loss = 2.2788124, step = 500 (0.422 sec)
INFO:tensorflow:global_step/sec: 235.906
INFO:tensorflow:global_step/sec: 235.906
INFO:tensorflow:loss = 2.296442, step = 600 (0.424 sec)
INFO:tensorflow:loss = 2.296442, step = 600 (0.424 sec)
INFO:tensorflow:global_step/sec: 232.171
INFO:tensorflow:global_step/sec: 232.171
INFO:tensorflow:loss = 2.2674422, step = 700 (0.428 sec)
INFO:tensorflow:loss = 2.2674422, step = 700 (0.428 sec)
INFO:tensorflow:global_step/sec: 256.679
INFO:tensorflow:global_step/sec: 256.679
INFO:tensorflow:loss = 2.2554998, step = 800 (0.389 sec)
INFO:tensorflow:loss = 2.2554998, step = 800 (0.389 sec)
INFO:tensorflow:global_step/sec: 645.399
INFO:tensorflow:global_step/sec: 645.399
INFO:tensorflow:loss = 2.2589102, step = 900 (0.155 sec)
INFO:tensorflow:loss = 2.2589102, step = 900 (0.155 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-03-25T01:25:44Z
INFO:tensorflow:Starting evaluation at 2021-03-25T01:25:44Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 2.03002s
INFO:tensorflow:Inference Time : 2.03002s
INFO:tensorflow:Finished evaluation at 2021-03-25-01:25:46
INFO:tensorflow:Finished evaluation at 2021-03-25-01:25:46
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2567787
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2567787
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.1219478.
INFO:tensorflow:Loss for final step: 1.1219478.
({'loss': 2.2567787, 'global_step': 938}, [])

Optimiza el rendimiento del entrenamiento

Ahora tiene un modelo y un Estimador con capacidad para varios trabajadores con tecnología tf.distribute.Strategy . Puede probar las siguientes técnicas para optimizar el rendimiento de la formación de varios trabajadores:

  • Aumentar el tamaño del lote: el tamaño del lote especificado aquí es por GPU. En general, se recomienda el tamaño de lote más grande que se ajuste a la memoria de la GPU.
  • Transmitir variables: tf.float las variables a tf.float si es posible. El modelo oficial de ResNet incluye un ejemplo de cómo se puede hacer esto.
  • Utilice la comunicación colectiva: MultiWorkerMirroredStrategy proporciona múltiples implementaciones de comunicación colectiva .

    • RING implementa colectivos basados ​​en anillos que utilizan gRPC como capa de comunicación entre hosts.
    • NCCL utiliza NCCL Nvidia para implementar colectivos.
    • AUTO difiere la elección al tiempo de ejecución.

    La mejor opción de implementación colectiva depende del número y tipo de GPU y de la interconexión de red en el clúster. Para anular la elección automática, especifique un valor válido para el parámetro de communication del constructor de MultiWorkerMirroredStrategy , por ejemplo, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Visite la sección Rendimiento de la guía para obtener más información sobre otras estrategias y herramientas que puede utilizar para optimizar el rendimiento de sus modelos de TensorFlow.

Otros ejemplos de código

  1. Ejemplo de extremo a extremo para la capacitación de varios trabajadores en tensorflow / ecosistema utilizando plantillas de Kubernetes. Este ejemplo comienza con un modelo de Keras y lo convierte en un Estimator usando la API tf.keras.estimator.model_to_estimator .
  2. Modelos oficiales , muchos de los cuales se pueden configurar para ejecutar múltiples estrategias de distribución.