Migrar de TPUEstimator a TPUStrategy

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar libreta

Esta guía demuestra cómo migrar sus flujos de trabajo que se ejecutan en TPU desde la API TPUStrategy de TensorFlow 1 a la API TPUEstimator de TensorFlow 2.

  • En TensorFlow 1, la API tf.compat.v1.estimator.tpu.TPUEstimator le permite entrenar y evaluar un modelo, así como realizar inferencias y guardar su modelo (para servir) en TPU (en la nube).
  • En TensorFlow 2, para realizar un entrenamiento sincrónico en TPU y pods de TPU (una colección de dispositivos de TPU conectados por interfaces de red de alta velocidad dedicadas), debe usar una estrategia de distribución de TPU tf.distribute.TPUStrategy . La estrategia puede funcionar con las API de Keras, incluida la creación de modelos ( tf.keras.Model ), optimizadores ( tf.keras.optimizers.Optimizer ) y entrenamiento ( Model.fit ), así como un ciclo de entrenamiento personalizado (con tf.function y tf.GradientTape ).

Para ver ejemplos completos de TensorFlow 2, consulta la guía Usar TPU , es decir, la sección Clasificación en TPU , y el tutorial Resolver tareas de GLUE usando BERT en TPU . También puede encontrar útil la guía de capacitación distribuida , que cubre todas las estrategias de distribución de TensorFlow, incluida TPUStrategy .

Configuración

Comience con importaciones y un conjunto de datos simple para fines de demostración:

import tensorflow as tf
import tensorflow.compat.v1 as tf1
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.8) or chardet (2.3.0)/charset_normalizer (2.0.11) doesn't match a supported version!
  RequestsDependencyWarning)
features = [[1., 1.5]]
labels = [[0.3]]
eval_features = [[4., 4.5]]
eval_labels = [[0.8]]

TensorFlow 1: Impulse un modelo en TPU con TPUEstimator

Esta sección de la guía demuestra cómo realizar capacitación y evaluación con tf.compat.v1.estimator.tpu.TPUEstimator en TensorFlow 1.

Para usar un TPUEstimator , primero defina algunas funciones: una función de entrada para los datos de entrenamiento, una función de entrada de evaluación para los datos de evaluación y una función de modelo que le dice al TPUEstimator cómo se define la operación de entrenamiento con las características y etiquetas:

def _input_fn(params):
  dataset = tf1.data.Dataset.from_tensor_slices((features, labels))
  dataset = dataset.repeat()
  return dataset.batch(params['batch_size'], drop_remainder=True)

def _eval_input_fn(params):
  dataset = tf1.data.Dataset.from_tensor_slices((eval_features, eval_labels))
  dataset = dataset.repeat()
  return dataset.batch(params['batch_size'], drop_remainder=True)

def _model_fn(features, labels, mode, params):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

Con esas funciones definidas, cree un tf.distribute.cluster_resolver.TPUClusterResolver que proporcione la información del clúster y un objeto tf.compat.v1.estimator.tpu.RunConfig . Junto con la función de modelo que ha definido, ahora puede crear un TPUEstimator . Aquí, simplificará el flujo al omitir los ahorros en el punto de control. Luego, especificará el tamaño del lote para el entrenamiento y la evaluación de TPUEstimator .

cluster_resolver = tf1.distribute.cluster_resolver.TPUClusterResolver(tpu='')
print("All devices: ", tf1.config.list_logical_devices('TPU'))
All devices:  []
tpu_config = tf1.estimator.tpu.TPUConfig(iterations_per_loop=10)
config = tf1.estimator.tpu.RunConfig(
    cluster=cluster_resolver,
    save_checkpoints_steps=None,
    tpu_config=tpu_config)
estimator = tf1.estimator.tpu.TPUEstimator(
    model_fn=_model_fn,
    config=config,
    train_batch_size=8,
    eval_batch_size=8)
WARNING:tensorflow:Estimator's model_fn (<function _model_fn at 0x7fef73ae76a8>) includes params argument, but params are not passed to Estimator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmp_bkua7zf
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmp_bkua7zf', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
cluster_def {
  job {
    name: "worker"
    tasks {
      key: 0
      value: "10.240.1.2:8470"
    }
  }
}
isolate_session_state: true
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': None, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({'worker': ['10.240.1.2:8470']}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://10.240.1.2:8470', '_evaluation_master': 'grpc://10.240.1.2:8470', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_tpu_config': TPUConfig(iterations_per_loop=10, num_shards=None, num_cores_per_replica=None, per_host_input_for_training=2, tpu_job_name=None, initial_infeed_sleep_secs=None, input_partition_dims=None, eval_training_input_configuration=2, experimental_host_call_every_n_steps=1, experimental_allow_per_host_v2_parallel_get_next=False, experimental_feed_hook=None), '_cluster': <tensorflow.python.distribute.cluster_resolver.tpu.tpu_cluster_resolver.TPUClusterResolver object at 0x7ff288b6aa20>}
INFO:tensorflow:_TPUContext: eval_on_tpu True

Llame a TPUEstimator.train para comenzar a entrenar el modelo:

estimator.train(_input_fn, steps=1)
INFO:tensorflow:Querying Tensorflow master (grpc://10.240.1.2:8470) for TPU system metadata.
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, -1, 2562214468325910549)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 17179869184, 7806191887455116208)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 17179869184, 4935096526614797404)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 17179869184, 6208852770722846295)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 17179869184, -4484747666522931072)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 17179869184, -8715412538518264422)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 17179869184, -3521027846460785533)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 17179869184, -6534172152637582552)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 17179869184, 4735861352635655596)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 17179869184, -411508280321075475)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 17179869184, 2431932884271560631)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/adagrad.py:77: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Bypassing TPUEstimator hook
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:TPU job name worker
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/tpu/tpu_estimator.py:758: Variable.load (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Prefer Variable.assign which has equivalent behavior in 2.X.
INFO:tensorflow:Initialized dataset iterators in 0 seconds
INFO:tensorflow:Installing graceful shutdown hook.
INFO:tensorflow:Creating heartbeat manager for ['/job:worker/replica:0/task:0/device:CPU:0']
INFO:tensorflow:Configuring worker heartbeat: shutdown_mode: WAIT_FOR_COORDINATOR

INFO:tensorflow:Init TPU system
INFO:tensorflow:Initialized TPU in 7 seconds
INFO:tensorflow:Starting infeed thread controller.
INFO:tensorflow:Starting outfeed thread controller.
INFO:tensorflow:Enqueue next (1) batch(es) of data to infeed.
INFO:tensorflow:Dequeue next (1) batch(es) of data from outfeed.
INFO:tensorflow:Outfeed finished for iteration (0, 0)
INFO:tensorflow:loss = 4.462118, step = 1
INFO:tensorflow:Stop infeed thread controller
INFO:tensorflow:Shutting down InfeedController thread.
INFO:tensorflow:InfeedController received shutdown signal, stopping.
INFO:tensorflow:Infeed thread finished, shutting down.
INFO:tensorflow:infeed marked as finished
INFO:tensorflow:Stop output thread controller
INFO:tensorflow:Shutting down OutfeedController thread.
INFO:tensorflow:OutfeedController received shutdown signal, stopping.
INFO:tensorflow:Outfeed thread finished, shutting down.
INFO:tensorflow:outfeed marked as finished
INFO:tensorflow:Shutdown TPU system.
INFO:tensorflow:Loss for final step: 4.462118.
INFO:tensorflow:training_loop marked as finished
<tensorflow_estimator.python.estimator.tpu.tpu_estimator.TPUEstimator at 0x7fec59ef9d68>

Luego, llame a TPUEstimator.evaluate para evaluar el modelo utilizando los datos de evaluación:

estimator.evaluate(_eval_input_fn, steps=1)
INFO:tensorflow:Could not find trained model in model_dir: /tmp/tmp_bkua7zf, running initialization to evaluate.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/tpu/tpu_estimator.py:3406: div (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-02-05T13:15:25
INFO:tensorflow:TPU job name worker
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Init TPU system
INFO:tensorflow:Initialized TPU in 10 seconds
INFO:tensorflow:Starting infeed thread controller.
INFO:tensorflow:Starting outfeed thread controller.
INFO:tensorflow:Initialized dataset iterators in 0 seconds
INFO:tensorflow:Enqueue next (1) batch(es) of data to infeed.
INFO:tensorflow:Dequeue next (1) batch(es) of data from outfeed.
INFO:tensorflow:Outfeed finished for iteration (0, 0)
INFO:tensorflow:Evaluation [1/1]
INFO:tensorflow:Stop infeed thread controller
INFO:tensorflow:Shutting down InfeedController thread.
INFO:tensorflow:InfeedController received shutdown signal, stopping.
INFO:tensorflow:Infeed thread finished, shutting down.
INFO:tensorflow:infeed marked as finished
INFO:tensorflow:Stop output thread controller
INFO:tensorflow:Shutting down OutfeedController thread.
INFO:tensorflow:OutfeedController received shutdown signal, stopping.
INFO:tensorflow:Outfeed thread finished, shutting down.
INFO:tensorflow:outfeed marked as finished
INFO:tensorflow:Shutdown TPU system.
INFO:tensorflow:Inference Time : 10.80091s
INFO:tensorflow:Finished evaluation at 2022-02-05-13:15:36
INFO:tensorflow:Saving dict for global step 1: global_step = 1, loss = 116.58184
INFO:tensorflow:evaluation_loop marked as finished
{'loss': 116.58184, 'global_step': 1}

TensorFlow 2: Impulse un modelo en TPU con Keras Model.fit y TPUStrategy

En TensorFlow 2, para capacitar a los trabajadores de TPU, use tf.distribute.TPUStrategy junto con las API de Keras para la definición y capacitación/evaluación del modelo. (Consulte la guía Use TPU para obtener más ejemplos de entrenamiento con Keras Model.fit y un bucle de entrenamiento personalizado (con tf.function y tf.GradientTape ).)

Dado que debe realizar un trabajo de inicialización para conectarse al clúster remoto e inicializar los trabajadores de TPU, comience creando un TPUClusterResolver para proporcionar la información del clúster y conectarse al clúster. (Obtenga más información en la sección de inicialización de TPU de la guía Usar TPU ).

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470
INFO:tensorflow:Finished initializing TPU system.
INFO:tensorflow:Finished initializing TPU system.
All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]

Luego, una vez que sus datos estén preparados, creará una TPUStrategy , definirá un modelo, métricas y un optimizador bajo el alcance de esta estrategia.

Para lograr una velocidad de entrenamiento comparable con TPUStrategy , debe asegurarse de elegir un número para steps_per_execution en Model.compile porque especifica la cantidad de lotes que se ejecutarán durante cada llamada de tf.function y es fundamental para el rendimiento. Este argumento es similar a iterations_per_loop usado en un TPUEstimator . Si está utilizando bucles de entrenamiento personalizados, debe asegurarse de que se ejecuten varios pasos dentro de la función de entrenamiento tf.function -ed. Vaya a la sección Mejorar el rendimiento con varios pasos dentro de tf.function de la guía Usar TPU para obtener más información.

tf.distribute.TPUStrategy puede admitir formas dinámicas limitadas, en cuyo caso se puede inferir el límite superior del cálculo de la forma dinámica. Pero las formas dinámicas pueden presentar cierta sobrecarga de rendimiento en comparación con las formas estáticas. Por lo tanto, generalmente se recomienda hacer que sus formas de entrada sean estáticas si es posible, especialmente en el entrenamiento. Una operación común que devuelve una forma dinámica es tf.data.Dataset.batch(batch_size) , ya que la cantidad de muestras restantes en una transmisión puede ser menor que el tamaño del lote. Por lo tanto, cuando entrene en la TPU, debe usar tf.data.Dataset.batch(..., drop_remainder=True) para obtener el mejor rendimiento de entrenamiento.

dataset = tf.data.Dataset.from_tensor_slices(
    (features, labels)).shuffle(10).repeat().batch(
        8, drop_remainder=True).prefetch(2)
eval_dataset = tf.data.Dataset.from_tensor_slices(
    (eval_features, eval_labels)).batch(1, drop_remainder=True)

strategy = tf.distribute.TPUStrategy(cluster_resolver)
with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse", steps_per_execution=10)
INFO:tensorflow:Found TPU system:
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

Con eso, está listo para entrenar el modelo con el conjunto de datos de entrenamiento:

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
10/10 [==============================] - 2s 151ms/step - loss: 0.0840
Epoch 2/5
10/10 [==============================] - 0s 3ms/step - loss: 9.6915e-04
Epoch 3/5
10/10 [==============================] - 0s 3ms/step - loss: 1.5100e-05
Epoch 4/5
10/10 [==============================] - 0s 3ms/step - loss: 2.3593e-07
Epoch 5/5
10/10 [==============================] - 0s 3ms/step - loss: 3.7059e-09
<keras.callbacks.History at 0x7fec58275438>

Finalmente, evalúe el modelo utilizando el conjunto de datos de evaluación:

model.evaluate(eval_dataset, return_dict=True)
1/1 [==============================] - 2s 2s/step - loss: 0.6127
{'loss': 0.6127181053161621}

Próximos pasos

Para obtener más información sobre TPUStrategy en TensorFlow 2, considere los siguientes recursos:

Para obtener más información sobre cómo personalizar su entrenamiento, consulte:

Los TPU, los ASIC especializados de Google para el aprendizaje automático, están disponibles a través de Google Colab , TPU Research Cloud y Cloud TPU .