Halaman ini diterjemahkan oleh Cloud Translation API.
Switch to English

Input Terdistribusi

Lihat di TensorFlow.org Jalankan di Google Colab Lihat sumber di GitHub Unduh buku catatan

API tf.distribute menyediakan cara mudah bagi pengguna untuk menskalakan pelatihan mereka dari satu mesin ke beberapa mesin. Saat menskalakan model mereka, pengguna juga harus mendistribusikan input mereka ke beberapa perangkat. tf.distribute menyediakan API yang dapat digunakan untuk mendistribusikan input Anda secara otomatis ke seluruh perangkat.

Panduan ini akan menunjukkan kepada Anda berbagai cara membuat kumpulan data dan iterator tf.distribute menggunakan API tf.distribute . Selain itu, topik berikut akan dibahas:

Panduan ini tidak mencakup penggunaan input terdistribusi dengan API Keras.

Set Data Terdistribusi

Untuk menggunakan API tf.distribute untuk menskalakan, disarankan agar pengguna menggunakantf.data.Dataset untuk mewakili masukan mereka. tf.distribute telah dibuat untuk bekerja secara efisien dengantf.data.Dataset (misalnya,tf.data.Dataset data otomatis ke setiap perangkat akselerator) dengan pengoptimalan kinerja yang secara teratur dimasukkan ke dalam implementasi. Jika Anda memiliki kasus penggunaan untuk menggunakan sesuatu selaintf.data.Dataset , lihat bagian selanjutnya dalam panduan ini. Dalam loop pelatihan yang tidak terdistribusi, pengguna pertama-tama membuat instancetf.data.Dataset lalu melakukan iterasi pada elemen. Sebagai contoh:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.4.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Untuk memungkinkan pengguna menggunakan strategi tf.distribute dengan sedikit perubahan pada kode milik pengguna, dua API diperkenalkan yang akan mendistribusikan instancetf.data.Dataset dan mengembalikan objektf.data.Dataset data terdistribusi. Seorang pengguna kemudian dapat mengulangi instance set data terdistribusi ini dan melatih model mereka seperti sebelumnya. Sekarang mari kita lihat dua API - tf.distribute.Strategy.experimental_distribute_dataset dan tf.distribute.Strategy.distribute_datasets_from_function secara lebih rinci:

tf.distribute.Strategy.experimental_distribute_dataset

Pemakaian

API ini mengambil instancetf.data.Dataset sebagai input dan mengembalikan instance tf.distribute.DistributedDataset . Anda harus mengumpulkan kumpulan data masukan dengan nilai yang sama dengan ukuran kumpulan global. Ukuran tumpukan global ini adalah jumlah sampel yang ingin Anda proses di semua perangkat dalam 1 langkah. Anda dapat melakukan iterasi atas kumpulan data terdistribusi ini dengan cara Pythonic atau membuat iterator menggunakan iter . Objek yang dikembalikan bukanlah instancetf.data.Dataset dan tidak mendukung API lain yang mengubah atau memeriksa kumpulan data dengan cara apa pun. Ini adalah API yang direkomendasikan jika Anda tidak memiliki cara khusus untuk membagi masukan Anda ke replika yang berbeda.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Properti

Batching

tf.distribute me-tf.data.Dataset instance inputtf.data.Dataset dengan ukuran batch baru yang sama dengan ukuran batch global dibagi dengan jumlah replika yang disinkronkan. Jumlah replika yang disinkronkan sama dengan jumlah perangkat yang mengambil bagian dalam pengurangan gradien selama pelatihan. Ketika pengguna memanggil next pada iterator terdistribusi, ukuran batch per replika data dikembalikan pada setiap replika. Kardinalitas set data ulang akan selalu berupa kelipatan dari jumlah replika. Berikut ini beberapa contohnya:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Tanpa distribusi:
    • Tumpukan 1: [0, 1, 2, 3]
    • Tumpukan 2: [4, 5]
    • Dengan distribusi lebih dari 2 replika. Batch terakhir ([4, 5]) dibagi menjadi 2 replika.

    • Batch 1:

      • Replika 1: [0, 1]
      • Replika 2: [2, 3]
    • Batch 2:

      • Replika 2: [4]
      • Replika 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Tanpa distribusi:
    • Tumpukan 1: [[0], [1], [2], [3]]
    • Dengan distribusi lebih dari 5 replika:
    • Batch 1:
      • Replika 1: [0]
      • Replika 2: [1]
      • Replika 3: [2]
      • Replika 4: [3]
      • Replika 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Tanpa distribusi:
    • Tumpukan 1: [0, 1, 2, 3]
    • Tumpukan 2: [4, 5, 6, 7]
    • Dengan distribusi lebih dari 3 replika:
    • Batch 1:
      • Replika 1: [0, 1]
      • Replika 2: [2, 3]
      • Replika 3: []
    • Batch 2:
      • Replika 1: [4, 5]
      • Replika 2: [6, 7]
      • Replika 3: []

Rebatching dataset memiliki kompleksitas ruang yang meningkat secara linier dengan jumlah replika. Ini berarti bahwa untuk kasus penggunaan pelatihan multi pekerja, pipeline input dapat mengalami error OOM.

Sharding

tf.distribute juga melakukan tf.distribute data input dalam pelatihan multi pekerja dengan MultiWorkerMirroredStrategy dan TPUStrategy . Setiap set data dibuat di perangkat CPU pekerja. Melakukan autosharding pada set data pada satu set pekerja berarti bahwa setiap pekerja diberi subset dari seluruh set data (jika tf.data.experimental.AutoShardPolicy benar disetel). Ini untuk memastikan bahwa di setiap langkah, ukuran batch global dari elemen set data yang tidak tumpang tindih akan diproses oleh setiap pekerja. Autosharding memiliki beberapa opsi berbeda yang dapat ditentukan menggunakan tf.data.experimental.DistributeOptions . Perhatikan bahwa tidak ada autosharding dalam pelatihan multi pekerja dengan ParameterServerStrategy , dan informasi lebih lanjut tentang pembuatan set data dengan strategi ini dapat ditemukan di tutorial Strategi Server Parameter .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

Ada tiga opsi berbeda yang dapat Anda tetapkan untuk tf.data.experimental.AutoShardPolicy :

  • OTOMATIS: Ini adalah opsi default yang berarti upaya akan dilakukan untuk memecah belah oleh FILE. Upaya untuk memecah menurut FILE gagal jika kumpulan data berbasis file tidak terdeteksi. tf.distribute kemudian akan kembali ke sharding oleh DATA. Perhatikan bahwa jika InvalidArgumentError data masukan adalah berbasis file tetapi jumlah file lebih sedikit dari jumlah pekerja, InvalidArgumentError akan dimunculkan. Jika ini terjadi, setel kebijakan secara eksplisit ke AutoShardPolicy.DATA , atau bagi sumber input Anda menjadi file yang lebih kecil sehingga jumlah file lebih banyak daripada jumlah pekerja.
  • FILE: Ini adalah opsi jika Anda ingin membagi file input ke semua pekerja. Anda harus menggunakan opsi ini jika jumlah file input jauh lebih besar daripada jumlah pekerja dan data dalam file didistribusikan secara merata. Kelemahan dari opsi ini adalah memiliki pekerja yang menganggur jika data dalam file tidak didistribusikan secara merata. Jika jumlah file kurang dari jumlah pekerja, InvalidArgumentError akan dimunculkan. Jika ini terjadi, setel kebijakan secara eksplisit ke AutoShardPolicy.DATA . Misalnya, mari kita mendistribusikan 2 file ke 2 pekerja dengan masing-masing 1 replika. File 1 berisi [0, 1, 2, 3, 4, 5] dan File 2 berisi [6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2 dan ukuran kumpulan global menjadi 4.

    • Pekerja 0:
    • Tumpukan 1 = Replika 1: [0, 1]
    • Tumpukan 2 = Replika 1: [2, 3]
    • Gelombang 3 = Replika 1: [4]
    • Tumpukan 4 = Replika 1: [5]
    • Pekerja 1:
    • Tumpukan 1 = Replika 2: [6, 7]
    • Tumpukan 2 = Replika 2: [8, 9]
    • Angkatan 3 = Replika 2: [10]
    • Tumpukan 4 = Replika 2: [11]
  • DATA: Ini akan menghapus otomatis elemen di semua pekerja. Setiap pekerja akan membaca seluruh kumpulan data dan hanya memproses pecahan yang ditetapkan untuknya. Semua pecahan lainnya akan dibuang. Ini biasanya digunakan jika jumlah file input kurang dari jumlah pekerja dan Anda ingin sharding data yang lebih baik di semua pekerja. Sisi negatifnya adalah seluruh dataset akan dibaca pada setiap pekerja. Misalnya, mari kita mendistribusikan 1 file ke 2 pekerja. File 1 berisi [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2.

    • Pekerja 0:
    • Tumpukan 1 = Replika 1: [0, 1]
    • Tumpukan 2 = Replika 1: [4, 5]
    • Kelompok 3 = Replika 1: [8, 9]
    • Pekerja 1:
    • Tumpukan 1 = Replika 2: [2, 3]
    • Tumpukan 2 = Replika 2: [6, 7]
    • Angkatan 3 = Replika 2: [10, 11]
  • OFF: Jika Anda menonaktifkan autosharding, setiap pekerja akan memproses semua data. Misalnya, mari kita mendistribusikan 1 file ke 2 pekerja. File 1 berisi [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2. Kemudian setiap pekerja akan melihat distribusi berikut:

    • Pekerja 0:
    • Tumpukan 1 = Replika 1: [0, 1]
    • Tumpukan 2 = Replika 1: [2, 3]
    • Tumpukan 3 = Replika 1: [4, 5]
    • Tumpukan 4 = Replika 1: [6, 7]
    • Tumpukan 5 = Replika 1: [8, 9]
    • Tumpukan 6 = Replika 1: [10, 11]

    • Pekerja 1:

    • Tumpukan 1 = Replika 2: [0, 1]

    • Tumpukan 2 = Replika 2: [2, 3]

    • Gelombang 3 = Replika 2: [4, 5]

    • Tumpukan 4 = Replika 2: [6, 7]

    • Tumpukan 5 = Replika 2: [8, 9]

    • Angkatan 6 = Replika 2: [10, 11]

Mengambil lebih dulu

Secara default, tf.distribute menambahkan transformasi prefetch di akhir instancetf.data.Dataset disediakan pengguna. Argumen untuk transformasi prefetch yaitu buffer_size sama dengan jumlah replika yang disinkronkan.

tf.distribute.Strategy.distribute_datasets_from_function

Pemakaian

API ini mengambil fungsi input dan mengembalikan instance tf.distribute.DistributedDataset . Fungsi input yang diteruskan pengguna memiliki argumen tf.distribute.InputContext dan harus mengembalikan instancetf.data.Dataset . Dengan API ini, tf.distribute tidak membuat perubahan apa pun lebih lanjut ke instancetf.data.Dataset pengguna yang dikembalikan dari fungsi input. Merupakan tanggung jawab pengguna untuk mengumpulkan dan memecah kumpulan data. tf.distribute memanggil fungsi input pada perangkat CPU masing-masing pekerja. Selain memungkinkan pengguna untuk menentukan logika batching dan sharding mereka sendiri, API ini juga menunjukkan skalabilitas dan performa yang lebih baik dibandingkan dengan tf.distribute.Strategy.experimental_distribute_dataset saat digunakan untuk pelatihan multi pekerja.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Properti

Batching

tf.data.Dataset yang merupakan nilai kembalian dari fungsi input harus di-batch menggunakan ukuran batch per replika. Ukuran tumpukan per replika adalah ukuran tumpukan global dibagi dengan jumlah replika yang mengambil bagian dalam pelatihan sinkronisasi. Ini karena tf.distribute memanggil fungsi input pada perangkat CPU masing-masing pekerja. Kumpulan data yang dibuat pada pekerja tertentu harus siap digunakan oleh semua replika pekerja tersebut.

Sharding

Objek tf.distribute.InputContext yang secara implisit diteruskan sebagai argumen ke fungsi input pengguna dibuat oleh tf.distribute bawah tenda. Ini memiliki informasi tentang jumlah pekerja, id pekerja saat ini, dll. Fungsi input ini dapat menangani sharding sesuai kebijakan yang ditetapkan oleh pengguna menggunakan properti ini yang merupakan bagian dari objek tf.distribute.InputContext .

Mengambil lebih dulu

tf.distribute tidak menambahkan transformasi prefetch di akhirtf.data.Dataset dikembalikan oleh fungsi input yang disediakan pengguna.

Iterator Terdistribusi

Mirip dengan instancetf.data.Dataset terdistribusi, Anda perlu membuat iterator pada instance tf.distribute.DistributedDataset untuk mengulanginya dan mengakses elemen di tf.distribute.DistributedDataset . Berikut ini adalah cara-cara untuk membuat tf.distribute.DistributedIterator dan menggunakannya untuk melatih model Anda:

Penggunaan

Gunakan Pythonic untuk konstruksi loop

Anda dapat menggunakan loop Pythonic yang ramah pengguna untuk melakukan iterasi pada tf.distribute.DistributedDataset . Elemen yang dikembalikan dari tf.distribute.DistributedIterator dapat berupa satu tf.Tensor atau tf.distribute.DistributedValues yang berisi nilai per replika. Menempatkan loop di dalam fungsi tf.function akan memberikan peningkatan kinerja. Namun, break dan return saat ini tidak didukung untuk loop di atas tf.distribute.DistributedDataset yang ditempatkan di dalam tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Gunakan iter untuk membuat iterator eksplisit

Untuk mengulang elemen dalam instance tf.distribute.DistributedDataset , Anda dapat membuat tf.distribute.DistributedIterator menggunakan iter API di atasnya. Dengan iterator eksplisit, Anda dapat melakukan iterasi untuk sejumlah langkah tetap. Dalam rangka untuk mendapatkan elemen berikutnya dari tf.distribute.DistributedIterator contoh dist_iterator , Anda dapat menghubungi next(dist_iterator) , dist_iterator.get_next() , atau dist_iterator.get_next_as_optional() . Dua yang pertama pada dasarnya sama:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

Dengan next() atau tf.distribute.DistributedIterator.get_next() , jika tf.distribute.DistributedIterator telah mencapai akhir, kesalahan OutOfRange akan muncul. Klien dapat menangkap kesalahan di sisi python dan terus melakukan pekerjaan lain seperti checkpointing dan evaluasi. Namun, ini tidak akan berfungsi jika Anda menggunakan loop pelatihan host (yaitu, menjalankan beberapa langkah per tf.function ), yang terlihat seperti:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn berisi beberapa langkah dengan membungkus badan langkah di dalam tf.range . Dalam kasus ini, iterasi yang berbeda dalam loop tanpa ketergantungan dapat dimulai secara paralel, sehingga kesalahan OutOfRange dapat dipicu di iterasi selanjutnya sebelum komputasi iterasi sebelumnya selesai. Setelah kesalahan OutOfRange terjadi, semua operasi dalam fungsi tersebut akan segera dihentikan. Jika ini adalah kasus yang ingin Anda hindari, alternatif yang tidak memunculkan kesalahan tf.distribute.DistributedIterator.get_next_as_optional() adalah tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional mengembalikan tf.experimental.Optional yang berisi elemen berikutnya atau tidak ada nilai jika tf.distribute.DistributedIterator telah mencapai akhir.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Menggunakan properti element_spec

Jika Anda meneruskan elemen tf.function data terdistribusi ke tf.function dan menginginkan jaminan tf.TypeSpec , Anda dapat menentukan argumen input_signature dari tf.function . Output dari dataset terdistribusi adalah tf.distribute.DistributedValues yang dapat merepresentasikan input ke satu perangkat atau beberapa perangkat. Untuk mendapatkan tf.TypeSpec sesuai dengan nilai terdistribusi ini, Anda dapat menggunakan properti element_spec dari kumpulan data terdistribusi atau objek iterator terdistribusi.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Kelompok Parsial

Batch parsial dijumpai saat instancetf.data.Dataset yang dibuat pengguna mungkin berisi ukuran batch yang tidak dapat dibagi secara merata dengan jumlah replika atau jika kardinalitas instanstf.data.Dataset data tidak dapat dibagi oleh ukuran batch. Ini berarti bahwa ketika kumpulan data didistribusikan ke beberapa replika, panggilan next pada beberapa iterator akan menghasilkan OutOfRangeError. Untuk menangani kasus penggunaan ini, tf.distribute mengembalikan kumpulan dummy ukuran batch 0 pada replika yang tidak memiliki data lagi untuk diproses.

Untuk kasus pekerja tunggal, jika data tidak dikembalikan oleh panggilan next pada iterator, kumpulan dummy 0 ukuran batch dibuat dan digunakan bersama dengan data nyata dalam set data. Dalam kasus kumpulan parsial, kumpulan data global terakhir akan berisi data nyata bersama kumpulan data tiruan. Kondisi berhenti untuk memproses data sekarang memeriksa apakah ada replika yang memiliki data. Jika tidak ada data di salah satu replika, kesalahan OutOfRange akan muncul.

Untuk kasus multi pekerja, nilai boolean yang mewakili keberadaan data pada setiap pekerja digabungkan menggunakan komunikasi replika silang dan ini digunakan untuk mengidentifikasi apakah semua pekerja telah selesai memproses kumpulan data terdistribusi. Karena ini melibatkan komunikasi lintas pekerja, ada beberapa hukuman kinerja yang terlibat.

Peringatan

  • Saat menggunakan API tf.distribute.Strategy.experimental_distribute_dataset dengan pengaturan banyak pekerja, pengguna meneruskantf.data.Dataset yang membaca dari file. Jika tf.data.experimental.AutoShardPolicy disetel ke AUTO atau FILE , ukuran kumpulan per langkah sebenarnya mungkin lebih kecil daripada ukuran kumpulan global yang ditentukan pengguna. Ini bisa terjadi jika elemen yang tersisa di file lebih kecil dari ukuran batch global. Pengguna dapat menghabiskan kumpulan data tanpa bergantung pada jumlah langkah untuk menjalankan atau menyetel tf.data.experimental.AutoShardPolicy ke DATA untuk mengatasinya.

  • Transformasi tf.distribute data stateful saat ini tidak didukung dengan tf.distribute dan operasi stateful apa pun yang mungkin dimiliki dataset saat ini diabaikan. Misalnya, jika kumpulan data Anda memiliki map_fn yang menggunakan tf.random.uniform untuk memutar gambar, maka Anda memiliki grafik kumpulan data yang bergantung pada status (yaitu benih acak) pada mesin lokal tempat proses python dijalankan.

  • Eksperimental tf.data.experimental.OptimizationOptions yang dinonaktifkan secara default dapat dalam konteks tertentu - seperti ketika digunakan bersama dengan tf.distribute - menyebabkan penurunan kinerja. Anda hanya boleh mengaktifkannya setelah Anda memvalidasi bahwa mereka menguntungkan kinerja beban kerja Anda dalam pengaturan distribusi.

  • Silakan merujuk ke panduan ini untuk mengetahui cara mengoptimalkan pipeline input Anda dengan tf.data secara umum. Beberapa tip tambahan:

    • Jika Anda memiliki banyak pekerja dan menggunakan tf.data.Dataset.list_files untuk membuat kumpulan data dari semua file yang cocok dengan satu atau beberapa pola glob, ingatlah untuk menyetel argumen seed atau setel shuffle=False sehingga setiap pekerja memecah file secara konsisten.

    • Jika pipeline input Anda menyertakan pengacakan data pada level record dan penguraian data, kecuali data yang tidak diurai secara signifikan lebih besar dari data yang diurai (yang biasanya bukan kasusnya), acak dulu lalu parse, seperti yang ditampilkan dalam contoh berikut. Ini dapat menguntungkan penggunaan dan kinerja memori.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) mempertahankan buffer internal elemen buffer_size , dan dengan demikian mengurangi buffer_size dapat mengurangi masalah OOM.

  • Urutan pemrosesan data oleh pekerja saat menggunakan tf.distribute.experimental_distribute_dataset atau tf.distribute.distribute_datasets_from_function tidak dijamin. Ini biasanya diperlukan jika Anda menggunakan tf.distribute to scale prediksi. Namun Anda dapat menyisipkan indeks untuk setiap elemen dalam batch dan mengurutkan keluarannya sesuai dengan itu. Cuplikan berikut adalah contoh cara mengurutkan keluaran.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Bagaimana cara mendistribusikan data saya jika saya tidak menggunakan instance canonical tf.data.Dataset?

Terkadang pengguna tidak dapat menggunakantf.data.Dataset untuk mewakili masukan mereka dan selanjutnya API yang disebutkan di atas untuk mendistribusikantf.data.Dataset data ke beberapa perangkat. Dalam kasus seperti itu, Anda dapat menggunakan tensor atau input mentah dari generator.

Gunakan fungsi_distribusi_perimental_fungsi_perimental untuk input tensor arbitrer

strategy.run menerima tf.distribute.DistributedValues yang merupakan output dari next(iterator) . Untuk meneruskan nilai tensor, gunakan experimental_distribute_values_from_function untuk membuat tf.distribute.DistributedValues dari tensor mentah.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Gunakan tf.data.Dataset.from_generator jika masukan Anda dari generator

Jika Anda memiliki fungsi generator yang ingin digunakan, Anda dapat membuat instancetf.data.Dataset menggunakan API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)