Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Użyj TPU

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Eksperymentalne wsparcie dla Cloud TPU jest obecnie dostępne dla Keras i Google Colab. Przed uruchomieniem tych notebooków Colab upewnij się, że akcelerator sprzętowy jest TPU, sprawdzając ustawienia notebooka: Czas wykonywania> Zmień typ środowiska wykonawczego> Akcelerator sprzętowy> TPU.

Ustawiać

 import tensorflow as tf

import os
import tensorflow_datasets as tfds
 

Inicjalizacja TPU

Jednostki TPU znajdują się zwykle na pracownikach Cloud TPU, które różnią się od lokalnego procesu, w którym działa program użytkownika w języku Python. W związku z tym należy wykonać pewne prace inicjujące, aby połączyć się ze zdalnym klastrem i zainicjować jednostki TPU. Zauważ, że argument tpu dla TPUClusterResolver to specjalny adres tylko dla Colab. Jeśli korzystasz z Google Compute Engine (GCE), powinieneś zamiast tego podać nazwę swojego CloudTPU.

 resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
 
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470

INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470

INFO:tensorflow:Clearing out eager caches

INFO:tensorflow:Clearing out eager caches

INFO:tensorflow:Finished initializing TPU system.

INFO:tensorflow:Finished initializing TPU system.

All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU')]

Ręczne umieszczanie urządzenia

Po zainicjowaniu TPU można użyć ręcznego umieszczania urządzenia, aby umieścić obliczenia na jednym urządzeniu TPU.

 a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
  c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
 
c device:  /job:worker/replica:0/task:0/device:TPU:0
tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)

Strategie dystrybucji

W większości przypadków użytkownicy chcą uruchomić model na wielu TPU w sposób równoległy do ​​danych. Strategia dystrybucji to abstrakcja, której można używać do sterowania modelami na procesorach CPU, GPU lub TPU. Wystarczy zamienić strategię dystrybucji, a model będzie działał na danym urządzeniu. Więcej informacji można znaleźć w przewodniku po strategii dystrybucji .

Najpierw tworzy obiekt TPUStrategy .

 strategy = tf.distribute.TPUStrategy(resolver)
 
INFO:tensorflow:Found TPU system:

INFO:tensorflow:Found TPU system:

INFO:tensorflow:*** Num TPU Cores: 8

INFO:tensorflow:*** Num TPU Cores: 8

INFO:tensorflow:*** Num TPU Workers: 1

INFO:tensorflow:*** Num TPU Workers: 1

INFO:tensorflow:*** Num TPU Cores Per Worker: 8

INFO:tensorflow:*** Num TPU Cores Per Worker: 8

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

Aby zreplikować obliczenia tak, aby działały na wszystkich rdzeniach TPU, możesz po prostu przekazać je do API strategy.run . Poniżej znajduje się przykład, że wszystkie rdzenie otrzymają te same dane wejściowe (a, b) i wykonają matmul na każdym rdzeniu niezależnie. Dane wyjściowe będą wartościami ze wszystkich replik.

 @tf.function
def matmul_fn(x, y):
  z = tf.matmul(x, y)
  return z

z = strategy.run(matmul_fn, args=(a, b))
print(z)
 
PerReplica:{
  0: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  1: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  2: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  3: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  4: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  5: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  6: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  7: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)
}

Klasyfikacja na TPU

Ponieważ poznaliśmy podstawowe pojęcia, czas przyjrzeć się bardziej konkretnemu przykładowi. Ten przewodnik pokazuje, jak używać strategii dystrybucji tf.distribute.experimental.TPUStrategy do obsługi Cloud TPU i trenowania modelu Keras.

Zdefiniuj model Keras

Poniżej znajduje się definicja modelu MNIST używającego Keras, niezmieniona od tej, której używałbyś na CPU lub GPU. Pamiętaj, że tworzenie modelu Keras musi odbywać się w ramach strategy.scope , aby zmienne można było tworzyć na każdym urządzeniu TPU. Inne części kodu nie muszą znajdować się w zakresie strategii.

 def create_model():
  return tf.keras.Sequential(
      [tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(256, 3, activation='relu'),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dense(256, activation='relu'),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)])
 

Wejściowe zbiory danych

Efektywne wykorzystanie interfejsu API tf.data.Dataset ma kluczowe znaczenie podczas korzystania z Cloud TPU, ponieważ niemożliwe jest korzystanie z Cloud TPU, jeśli nie można wystarczająco szybko dostarczyć im danych. Zobacz Przewodnik wydajności potoku wejściowego, aby uzyskać szczegółowe informacje na temat wydajności zestawu danych.

W przypadku wszystkich eksperymentów z tf.data.Dataset.from_tensor_slices najprostszych (przy użyciu tf.data.Dataset.from_tensor_slices lub innych danych na wykresie) musisz przechowywać wszystkie pliki danych odczytywane przez zestaw danych w zasobnikach Google Cloud Storage (GCS).

W większości przypadków zaleca się przekonwertowanie danych do formatu TFRecord i użycie tf.data.TFRecordDataset do ich odczytania. Zobacz samouczek TFRecord i tf.Example, aby dowiedzieć się, jak to zrobić. Nie jest to jednak trudne wymaganie i jeśli wolisz, możesz użyć innych czytników FixedLengthRecordDataset danych ( FixedLengthRecordDataset lub TextLineDataset ).

Małe zestawy danych można w całości załadować do pamięci za pomocą tf.data.Dataset.cache .

Niezależnie od używanego formatu danych zdecydowanie zaleca się używanie dużych plików, rzędu 100 MB. Jest to szczególnie ważne w przypadku tego ustawienia sieciowego, ponieważ koszt otwarcia pliku jest znacznie wyższy.

W tym miejscu należy użyć modułu tensorflow_datasets , aby uzyskać kopię danych szkoleniowych MNIST. Zauważ, że try_gcs jest określony tak, aby używać kopii dostępnej w publicznym zasobniku GCS. Jeśli tego nie określisz, TPU nie będzie mieć dostępu do pobieranych danych.

 def get_dataset(batch_size, is_training=True):
  split = 'train' if is_training else 'test'
  dataset, info = tfds.load(name='mnist', split=split, with_info=True,
                            as_supervised=True, try_gcs=True)

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0

    return image, label

  dataset = dataset.map(scale)

  # Only shuffle and repeat the dataset in training. The advantage to have a
  # infinite dataset for training is to avoid the potential last partial batch
  # in each epoch, so users don't need to think about scaling the gradients
  # based on the actual batch size.
  if is_training:
    dataset = dataset.shuffle(10000)
    dataset = dataset.repeat()

  dataset = dataset.batch(batch_size)

  return dataset
 

Wytrenuj model przy użyciu interfejsów API wysokiego poziomu Keras

Model można po prostu wytrenować za pomocą interfejsów API fit / compile Keras. Nic tutaj nie jest specyficzne dla TPU, napisałbyś ten sam kod poniżej, gdybyś miał wiele procesorów graficznych i korzystał z MirroredStrategy zamiast TPUStrategy . Aby dowiedzieć się więcej, zapoznaj się z samouczkiem Rozproszone szkolenie z Kerasem .

 with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size

train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset, 
          validation_steps=validation_steps)
 
Epoch 1/5
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

  1/300 [..............................] - ETA: 20:25 - loss: 2.3062 - sparse_categorical_accuracy: 0.0500WARNING:tensorflow:Callbacks method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0015s vs `on_train_batch_end` time: 0.0237s). Check your callbacks.

Warning:tensorflow:Callbacks method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0015s vs `on_train_batch_end` time: 0.0237s). Check your callbacks.

300/300 [==============================] - ETA: 0s - loss: 0.1450 - sparse_categorical_accuracy: 0.9561WARNING:tensorflow:Callbacks method `on_test_batch_end` is slow compared to the batch time (batch time: 0.0013s vs `on_test_batch_end` time: 0.0097s). Check your callbacks.

Warning:tensorflow:Callbacks method `on_test_batch_end` is slow compared to the batch time (batch time: 0.0013s vs `on_test_batch_end` time: 0.0097s). Check your callbacks.

300/300 [==============================] - 15s 50ms/step - loss: 0.1450 - sparse_categorical_accuracy: 0.9561 - val_loss: 0.0456 - val_sparse_categorical_accuracy: 0.9852
Epoch 2/5
300/300 [==============================] - 8s 28ms/step - loss: 0.0357 - sparse_categorical_accuracy: 0.9890 - val_loss: 0.0372 - val_sparse_categorical_accuracy: 0.9884
Epoch 3/5
300/300 [==============================] - 9s 28ms/step - loss: 0.0193 - sparse_categorical_accuracy: 0.9940 - val_loss: 0.0557 - val_sparse_categorical_accuracy: 0.9835
Epoch 4/5
300/300 [==============================] - 9s 29ms/step - loss: 0.0141 - sparse_categorical_accuracy: 0.9954 - val_loss: 0.0405 - val_sparse_categorical_accuracy: 0.9883
Epoch 5/5
300/300 [==============================] - 9s 29ms/step - loss: 0.0092 - sparse_categorical_accuracy: 0.9967 - val_loss: 0.0428 - val_sparse_categorical_accuracy: 0.9887

<tensorflow.python.keras.callbacks.History at 0x7f480c793240>

Aby zmniejszyć obciążenie Pythona i zmaksymalizować wydajność TPU, wypróbować eksperymentalną experimental_steps_per_execution argument Model.compile . Tutaj zwiększa przepustowość o około 50%:

 with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                # Anything between 2 and `steps_per_epoch` could help here.
                experimental_steps_per_execution = 50,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
 
Epoch 1/5
300/300 [==============================] - 16s 52ms/step - loss: 0.1337 - sparse_categorical_accuracy: 0.9583 - val_loss: 0.0521 - val_sparse_categorical_accuracy: 0.9840
Epoch 2/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0331 - sparse_categorical_accuracy: 0.9898 - val_loss: 0.0360 - val_sparse_categorical_accuracy: 0.9884
Epoch 3/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0188 - sparse_categorical_accuracy: 0.9939 - val_loss: 0.0405 - val_sparse_categorical_accuracy: 0.9887
Epoch 4/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0117 - sparse_categorical_accuracy: 0.9961 - val_loss: 0.0808 - val_sparse_categorical_accuracy: 0.9820
Epoch 5/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0119 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0488 - val_sparse_categorical_accuracy: 0.9862

<tensorflow.python.keras.callbacks.History at 0x7f47ac791240>

Wytrenuj model za pomocą niestandardowej pętli treningowej.

Możesz także tworzyć i trenować swoje modele bezpośrednio za pomocą tf.function i tf.distribute API. strategy.experimental_distribute_datasets_from_function API jest używane do dystrybucji zestawu danych z określoną funkcją zestawu danych. Należy zauważyć, że rozmiar partii przekazanej do zestawu danych będzie w tym przypadku określany jako rozmiar partii repliki, a nie globalny rozmiar partii. Aby dowiedzieć się więcej, zapoznaj się z samouczkiem Niestandardowe szkolenie z tf.distribute.Strategy .

Najpierw utwórz model, zbiory danych i tf.functions.

 # Create the model, optimizer and metrics inside strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()
  training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
  training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      'training_accuracy', dtype=tf.float32)

# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync

train_dataset = strategy.experimental_distribute_datasets_from_function(
    lambda _: get_dataset(per_replica_batch_size, is_training=True))

@tf.function
def train_step(iterator):
  """The step function for one training step"""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  strategy.run(step_fn, args=(next(iterator),))
 

Następnie uruchom pętlę treningową.

 steps_per_eval = 10000 // batch_size

train_iterator = iter(train_dataset)
for epoch in range(5):
  print('Epoch: {}/5'.format(epoch))

  for step in range(steps_per_epoch):
    train_step(train_iterator)
  print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
  training_loss.reset_states()
  training_accuracy.reset_states()
 
Epoch: 0/5
Current step: 300, training loss: 0.1335, accuracy: 95.8%
Epoch: 1/5
Current step: 600, training loss: 0.0344, accuracy: 98.93%
Epoch: 2/5
Current step: 900, training loss: 0.0196, accuracy: 99.35%
Epoch: 3/5
Current step: 1200, training loss: 0.0119, accuracy: 99.61%
Epoch: 4/5
Current step: 1500, training loss: 0.0102, accuracy: 99.65%

Poprawa wydajności w wielu krokach w tf.function

Wydajność można poprawić, uruchamiając wiele kroków w ramach funkcji tf.function . Osiąga się to poprzez opakowanie wywołania strategy.run z tf.range wewnątrz tf.function , AutoGraph przekonwertuje je na tf.while_loop w tf.while_loop TPU.

Chociaż przy lepszej wydajności, istnieją kompromisy w porównaniu z pojedynczym krokiem wewnątrz funkcji tf.function . Uruchamianie wielu kroków w funkcji tf.function jest mniej elastyczne, nie można w nich uruchamiać rzeczy chętnie ani dowolnego kodu Pythona.

 @tf.function
def train_multiple_steps(iterator, steps):
  """The step function for one training step"""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  for _ in tf.range(steps):
    strategy.run(step_fn, args=(next(iterator),))

# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get 
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))

print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
 
Current step: 1800, training loss: 0.0087, accuracy: 99.72%

Następne kroki