![]() | ![]() | ![]() | ![]() |
API tf.distribute menyediakan cara mudah bagi pengguna untuk menskalakan pelatihan mereka dari satu mesin ke beberapa mesin. Saat menskalakan model mereka, pengguna juga harus mendistribusikan input mereka ke beberapa perangkat. tf.distribute
menyediakan API yang dapat digunakan untuk mendistribusikan input Anda secara otomatis ke seluruh perangkat.
Panduan ini akan menunjukkan kepada Anda berbagai cara membuat kumpulan data dan iterator tf.distribute
menggunakan API tf.distribute
. Selain itu, topik berikut akan dibahas:
- Opsi penggunaan, sharding, dan batching saat menggunakan
tf.distribute.Strategy.experimental_distribute_dataset
dantf.distribute.Strategy.distribute_datasets_from_function
. - Berbagai cara untuk melakukan iterasi atas kumpulan data terdistribusi.
- Perbedaan antara
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
API dantf.data
API serta keterbatasan yang pengguna dapat menemukan dalam penggunaan mereka.
Panduan ini tidak mencakup penggunaan input terdistribusi dengan API Keras.
Set Data Terdistribusi
Untuk menggunakan API tf.distribute
untuk menskalakan, disarankan agar pengguna menggunakantf.data.Dataset
untuk mewakili masukan mereka. tf.distribute
telah dibuat untuk bekerja secara efisien dengantf.data.Dataset
(misalnya,tf.data.Dataset
data otomatis ke setiap perangkat akselerator) dengan pengoptimalan kinerja yang secara teratur dimasukkan ke dalam implementasi. Jika Anda memiliki kasus penggunaan untuk menggunakan sesuatu selaintf.data.Dataset
, lihat bagian selanjutnya dalam panduan ini. Dalam loop pelatihan yang tidak terdistribusi, pengguna pertama-tama membuat instancetf.data.Dataset
lalu melakukan iterasi pada elemen. Sebagai contoh:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.4.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Untuk memungkinkan pengguna menggunakan strategi tf.distribute
dengan sedikit perubahan pada kode milik pengguna, dua API diperkenalkan yang akan mendistribusikan instancetf.data.Dataset
dan mengembalikan objektf.data.Dataset
data terdistribusi. Seorang pengguna kemudian dapat mengulangi instance set data terdistribusi ini dan melatih model mereka seperti sebelumnya. Sekarang mari kita lihat dua API - tf.distribute.Strategy.experimental_distribute_dataset
dan tf.distribute.Strategy.distribute_datasets_from_function
secara lebih rinci:
tf.distribute.Strategy.experimental_distribute_dataset
Pemakaian
API ini mengambil instancetf.data.Dataset
sebagai input dan mengembalikan instance tf.distribute.DistributedDataset
. Anda harus mengumpulkan kumpulan data masukan dengan nilai yang sama dengan ukuran kumpulan global. Ukuran tumpukan global ini adalah jumlah sampel yang ingin Anda proses di semua perangkat dalam 1 langkah. Anda dapat melakukan iterasi atas kumpulan data terdistribusi ini dengan cara Pythonic atau membuat iterator menggunakan iter
. Objek yang dikembalikan bukanlah instancetf.data.Dataset
dan tidak mendukung API lain yang mengubah atau memeriksa kumpulan data dengan cara apa pun. Ini adalah API yang direkomendasikan jika Anda tidak memiliki cara khusus untuk membagi masukan Anda ke replika yang berbeda.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>)
Properti
Batching
tf.distribute
me-tf.data.Dataset
instance inputtf.data.Dataset
dengan ukuran batch baru yang sama dengan ukuran batch global dibagi dengan jumlah replika yang disinkronkan. Jumlah replika yang disinkronkan sama dengan jumlah perangkat yang mengambil bagian dalam pengurangan gradien selama pelatihan. Ketika pengguna memanggil next
pada iterator terdistribusi, ukuran batch per replika data dikembalikan pada setiap replika. Kardinalitas set data ulang akan selalu berupa kelipatan dari jumlah replika. Berikut ini beberapa contohnya:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Tanpa distribusi:
- Tumpukan 1: [0, 1, 2, 3]
- Tumpukan 2: [4, 5]
Dengan distribusi lebih dari 2 replika. Batch terakhir ([4, 5]) dibagi menjadi 2 replika.
Batch 1:
- Replika 1: [0, 1]
- Replika 2: [2, 3]
Batch 2:
- Replika 2: [4]
- Replika 2: [5]
tf.data.Dataset.range(4).batch(4)
- Tanpa distribusi:
- Tumpukan 1: [[0], [1], [2], [3]]
- Dengan distribusi lebih dari 5 replika:
- Batch 1:
- Replika 1: [0]
- Replika 2: [1]
- Replika 3: [2]
- Replika 4: [3]
- Replika 5: []
tf.data.Dataset.range(8).batch(4)
- Tanpa distribusi:
- Tumpukan 1: [0, 1, 2, 3]
- Tumpukan 2: [4, 5, 6, 7]
- Dengan distribusi lebih dari 3 replika:
- Batch 1:
- Replika 1: [0, 1]
- Replika 2: [2, 3]
- Replika 3: []
- Batch 2:
- Replika 1: [4, 5]
- Replika 2: [6, 7]
- Replika 3: []
Rebatching dataset memiliki kompleksitas ruang yang meningkat secara linier dengan jumlah replika. Ini berarti bahwa untuk kasus penggunaan pelatihan multi pekerja, pipeline input dapat mengalami error OOM.
Sharding
tf.distribute
juga melakukan tf.distribute
data input dalam pelatihan multi pekerja dengan MultiWorkerMirroredStrategy
dan TPUStrategy
. Setiap set data dibuat di perangkat CPU pekerja. Melakukan autosharding pada set data pada satu set pekerja berarti bahwa setiap pekerja diberi subset dari seluruh set data (jika tf.data.experimental.AutoShardPolicy
benar disetel). Ini untuk memastikan bahwa di setiap langkah, ukuran batch global dari elemen set data yang tidak tumpang tindih akan diproses oleh setiap pekerja. Autosharding memiliki beberapa opsi berbeda yang dapat ditentukan menggunakan tf.data.experimental.DistributeOptions
. Perhatikan bahwa tidak ada autosharding dalam pelatihan multi pekerja dengan ParameterServerStrategy
, dan informasi lebih lanjut tentang pembuatan set data dengan strategi ini dapat ditemukan di tutorial Strategi Server Parameter .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Ada tiga opsi berbeda yang dapat Anda tetapkan untuk tf.data.experimental.AutoShardPolicy
:
- OTOMATIS: Ini adalah opsi default yang berarti upaya akan dilakukan untuk memecah belah oleh FILE. Upaya untuk memecah menurut FILE gagal jika kumpulan data berbasis file tidak terdeteksi.
tf.distribute
kemudian akan kembali ke sharding oleh DATA. Perhatikan bahwa jikaInvalidArgumentError
data masukan adalah berbasis file tetapi jumlah file lebih sedikit dari jumlah pekerja,InvalidArgumentError
akan dimunculkan. Jika ini terjadi, setel kebijakan secara eksplisit keAutoShardPolicy.DATA
, atau bagi sumber input Anda menjadi file yang lebih kecil sehingga jumlah file lebih banyak daripada jumlah pekerja. FILE: Ini adalah opsi jika Anda ingin membagi file input ke semua pekerja. Anda harus menggunakan opsi ini jika jumlah file input jauh lebih besar daripada jumlah pekerja dan data dalam file didistribusikan secara merata. Kelemahan dari opsi ini adalah memiliki pekerja yang menganggur jika data dalam file tidak didistribusikan secara merata. Jika jumlah file kurang dari jumlah pekerja,
InvalidArgumentError
akan dimunculkan. Jika ini terjadi, setel kebijakan secara eksplisit keAutoShardPolicy.DATA
. Misalnya, mari kita mendistribusikan 2 file ke 2 pekerja dengan masing-masing 1 replika. File 1 berisi [0, 1, 2, 3, 4, 5] dan File 2 berisi [6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2 dan ukuran kumpulan global menjadi 4.- Pekerja 0:
- Tumpukan 1 = Replika 1: [0, 1]
- Tumpukan 2 = Replika 1: [2, 3]
- Gelombang 3 = Replika 1: [4]
- Tumpukan 4 = Replika 1: [5]
- Pekerja 1:
- Tumpukan 1 = Replika 2: [6, 7]
- Tumpukan 2 = Replika 2: [8, 9]
- Angkatan 3 = Replika 2: [10]
- Tumpukan 4 = Replika 2: [11]
DATA: Ini akan menghapus otomatis elemen di semua pekerja. Setiap pekerja akan membaca seluruh kumpulan data dan hanya memproses pecahan yang ditetapkan untuknya. Semua pecahan lainnya akan dibuang. Ini biasanya digunakan jika jumlah file input kurang dari jumlah pekerja dan Anda ingin sharding data yang lebih baik di semua pekerja. Sisi negatifnya adalah seluruh dataset akan dibaca pada setiap pekerja. Misalnya, mari kita mendistribusikan 1 file ke 2 pekerja. File 1 berisi [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2.
- Pekerja 0:
- Tumpukan 1 = Replika 1: [0, 1]
- Tumpukan 2 = Replika 1: [4, 5]
- Kelompok 3 = Replika 1: [8, 9]
- Pekerja 1:
- Tumpukan 1 = Replika 2: [2, 3]
- Tumpukan 2 = Replika 2: [6, 7]
- Angkatan 3 = Replika 2: [10, 11]
OFF: Jika Anda menonaktifkan autosharding, setiap pekerja akan memproses semua data. Misalnya, mari kita mendistribusikan 1 file ke 2 pekerja. File 1 berisi [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2. Kemudian setiap pekerja akan melihat distribusi berikut:
- Pekerja 0:
- Tumpukan 1 = Replika 1: [0, 1]
- Tumpukan 2 = Replika 1: [2, 3]
- Tumpukan 3 = Replika 1: [4, 5]
- Tumpukan 4 = Replika 1: [6, 7]
- Tumpukan 5 = Replika 1: [8, 9]
Tumpukan 6 = Replika 1: [10, 11]
Pekerja 1:
Tumpukan 1 = Replika 2: [0, 1]
Tumpukan 2 = Replika 2: [2, 3]
Gelombang 3 = Replika 2: [4, 5]
Tumpukan 4 = Replika 2: [6, 7]
Tumpukan 5 = Replika 2: [8, 9]
Angkatan 6 = Replika 2: [10, 11]
Mengambil lebih dulu
Secara default, tf.distribute
menambahkan transformasi prefetch di akhir instancetf.data.Dataset
disediakan pengguna. Argumen untuk transformasi prefetch yaitu buffer_size
sama dengan jumlah replika yang disinkronkan.
tf.distribute.Strategy.distribute_datasets_from_function
Pemakaian
API ini mengambil fungsi input dan mengembalikan instance tf.distribute.DistributedDataset
. Fungsi input yang diteruskan pengguna memiliki argumen tf.distribute.InputContext
dan harus mengembalikan instancetf.data.Dataset
. Dengan API ini, tf.distribute
tidak membuat perubahan apa pun lebih lanjut ke instancetf.data.Dataset
pengguna yang dikembalikan dari fungsi input. Merupakan tanggung jawab pengguna untuk mengumpulkan dan memecah kumpulan data. tf.distribute
memanggil fungsi input pada perangkat CPU masing-masing pekerja. Selain memungkinkan pengguna untuk menentukan logika batching dan sharding mereka sendiri, API ini juga menunjukkan skalabilitas dan performa yang lebih baik dibandingkan dengan tf.distribute.Strategy.experimental_distribute_dataset
saat digunakan untuk pelatihan multi pekerja.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Properti
Batching
tf.data.Dataset
yang merupakan nilai kembalian dari fungsi input harus di-batch menggunakan ukuran batch per replika. Ukuran tumpukan per replika adalah ukuran tumpukan global dibagi dengan jumlah replika yang mengambil bagian dalam pelatihan sinkronisasi. Ini karena tf.distribute
memanggil fungsi input pada perangkat CPU masing-masing pekerja. Kumpulan data yang dibuat pada pekerja tertentu harus siap digunakan oleh semua replika pekerja tersebut.
Sharding
Objek tf.distribute.InputContext
yang secara implisit diteruskan sebagai argumen ke fungsi input pengguna dibuat oleh tf.distribute
bawah tenda. Ini memiliki informasi tentang jumlah pekerja, id pekerja saat ini, dll. Fungsi input ini dapat menangani sharding sesuai kebijakan yang ditetapkan oleh pengguna menggunakan properti ini yang merupakan bagian dari objek tf.distribute.InputContext
.
Mengambil lebih dulu
tf.distribute
tidak menambahkan transformasi prefetch di akhirtf.data.Dataset
dikembalikan oleh fungsi input yang disediakan pengguna.
Iterator Terdistribusi
Mirip dengan instancetf.data.Dataset
terdistribusi, Anda perlu membuat iterator pada instance tf.distribute.DistributedDataset
untuk mengulanginya dan mengakses elemen di tf.distribute.DistributedDataset
. Berikut ini adalah cara-cara untuk membuat tf.distribute.DistributedIterator
dan menggunakannya untuk melatih model Anda:
Penggunaan
Gunakan Pythonic untuk konstruksi loop
Anda dapat menggunakan loop Pythonic yang ramah pengguna untuk melakukan iterasi pada tf.distribute.DistributedDataset
. Elemen yang dikembalikan dari tf.distribute.DistributedIterator
dapat berupa satu tf.Tensor
atau tf.distribute.DistributedValues
yang berisi nilai per replika. Menempatkan loop di dalam fungsi tf.function
akan memberikan peningkatan kinerja. Namun, break
dan return
saat ini tidak didukung untuk loop di atas tf.distribute.DistributedDataset
yang ditempatkan di dalam tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Gunakan iter
untuk membuat iterator eksplisit
Untuk mengulang elemen dalam instance tf.distribute.DistributedDataset
, Anda dapat membuat tf.distribute.DistributedIterator
menggunakan iter
API di atasnya. Dengan iterator eksplisit, Anda dapat melakukan iterasi untuk sejumlah langkah tetap. Dalam rangka untuk mendapatkan elemen berikutnya dari tf.distribute.DistributedIterator
contoh dist_iterator
, Anda dapat menghubungi next(dist_iterator)
, dist_iterator.get_next()
, atau dist_iterator.get_next_as_optional()
. Dua yang pertama pada dasarnya sama:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Dengan next()
atau tf.distribute.DistributedIterator.get_next()
, jika tf.distribute.DistributedIterator
telah mencapai akhir, kesalahan OutOfRange akan muncul. Klien dapat menangkap kesalahan di sisi python dan terus melakukan pekerjaan lain seperti checkpointing dan evaluasi. Namun, ini tidak akan berfungsi jika Anda menggunakan loop pelatihan host (yaitu, menjalankan beberapa langkah per tf.function
), yang terlihat seperti:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
berisi beberapa langkah dengan membungkus badan langkah di dalam tf.range
. Dalam kasus ini, iterasi yang berbeda dalam loop tanpa ketergantungan dapat dimulai secara paralel, sehingga kesalahan OutOfRange dapat dipicu di iterasi selanjutnya sebelum komputasi iterasi sebelumnya selesai. Setelah kesalahan OutOfRange terjadi, semua operasi dalam fungsi tersebut akan segera dihentikan. Jika ini adalah kasus yang ingin Anda hindari, alternatif yang tidak memunculkan kesalahan tf.distribute.DistributedIterator.get_next_as_optional()
adalah tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
mengembalikan tf.experimental.Optional
yang berisi elemen berikutnya atau tidak ada nilai jika tf.distribute.DistributedIterator
telah mencapai akhir.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Menggunakan properti element_spec
Jika Anda meneruskan elemen tf.function
data terdistribusi ke tf.function
dan menginginkan jaminan tf.TypeSpec
, Anda dapat menentukan argumen input_signature
dari tf.function
. Output dari dataset terdistribusi adalah tf.distribute.DistributedValues
yang dapat merepresentasikan input ke satu perangkat atau beberapa perangkat. Untuk mendapatkan tf.TypeSpec
sesuai dengan nilai terdistribusi ini, Anda dapat menggunakan properti element_spec
dari kumpulan data terdistribusi atau objek iterator terdistribusi.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Kelompok Parsial
Batch parsial dijumpai saat instancetf.data.Dataset
yang dibuat pengguna mungkin berisi ukuran batch yang tidak dapat dibagi secara merata dengan jumlah replika atau jika kardinalitas instanstf.data.Dataset
data tidak dapat dibagi oleh ukuran batch. Ini berarti bahwa ketika kumpulan data didistribusikan ke beberapa replika, panggilan next
pada beberapa iterator akan menghasilkan OutOfRangeError. Untuk menangani kasus penggunaan ini, tf.distribute
mengembalikan kumpulan dummy ukuran batch 0 pada replika yang tidak memiliki data lagi untuk diproses.
Untuk kasus pekerja tunggal, jika data tidak dikembalikan oleh panggilan next
pada iterator, kumpulan dummy 0 ukuran batch dibuat dan digunakan bersama dengan data nyata dalam set data. Dalam kasus kumpulan parsial, kumpulan data global terakhir akan berisi data nyata bersama kumpulan data tiruan. Kondisi berhenti untuk memproses data sekarang memeriksa apakah ada replika yang memiliki data. Jika tidak ada data di salah satu replika, kesalahan OutOfRange akan muncul.
Untuk kasus multi pekerja, nilai boolean yang mewakili keberadaan data pada setiap pekerja digabungkan menggunakan komunikasi replika silang dan ini digunakan untuk mengidentifikasi apakah semua pekerja telah selesai memproses kumpulan data terdistribusi. Karena ini melibatkan komunikasi lintas pekerja, ada beberapa hukuman kinerja yang terlibat.
Peringatan
Saat menggunakan API
tf.distribute.Strategy.experimental_distribute_dataset
dengan pengaturan banyak pekerja, pengguna meneruskantf.data.Dataset
yang membaca dari file. Jikatf.data.experimental.AutoShardPolicy
disetel keAUTO
atauFILE
, ukuran kumpulan per langkah sebenarnya mungkin lebih kecil daripada ukuran kumpulan global yang ditentukan pengguna. Ini bisa terjadi jika elemen yang tersisa di file lebih kecil dari ukuran batch global. Pengguna dapat menghabiskan kumpulan data tanpa bergantung pada jumlah langkah untuk menjalankan atau menyeteltf.data.experimental.AutoShardPolicy
keDATA
untuk mengatasinya.Transformasi
tf.distribute
data stateful saat ini tidak didukung dengantf.distribute
dan operasi stateful apa pun yang mungkin dimiliki dataset saat ini diabaikan. Misalnya, jika kumpulan data Anda memilikimap_fn
yang menggunakantf.random.uniform
untuk memutar gambar, maka Anda memiliki grafik kumpulan data yang bergantung pada status (yaitu benih acak) pada mesin lokal tempat proses python dijalankan.Eksperimental
tf.data.experimental.OptimizationOptions
yang dinonaktifkan secara default dapat dalam konteks tertentu - seperti ketika digunakan bersama dengantf.distribute
- menyebabkan penurunan kinerja. Anda hanya boleh mengaktifkannya setelah Anda memvalidasi bahwa mereka menguntungkan kinerja beban kerja Anda dalam pengaturan distribusi.Silakan merujuk ke panduan ini untuk mengetahui cara mengoptimalkan pipeline input Anda dengan
tf.data
secara umum. Beberapa tip tambahan:Jika Anda memiliki banyak pekerja dan menggunakan
tf.data.Dataset.list_files
untuk membuat kumpulan data dari semua file yang cocok dengan satu atau beberapa pola glob, ingatlah untuk menyetel argumenseed
atau setelshuffle=False
sehingga setiap pekerja memecah file secara konsisten.Jika pipeline input Anda menyertakan pengacakan data pada level record dan penguraian data, kecuali data yang tidak diurai secara signifikan lebih besar dari data yang diurai (yang biasanya bukan kasusnya), acak dulu lalu parse, seperti yang ditampilkan dalam contoh berikut. Ini dapat menguntungkan penggunaan dan kinerja memori.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
mempertahankan buffer internal elemenbuffer_size
, dan dengan demikian mengurangibuffer_size
dapat mengurangi masalah OOM.Urutan pemrosesan data oleh pekerja saat menggunakan
tf.distribute.experimental_distribute_dataset
atautf.distribute.distribute_datasets_from_function
tidak dijamin. Ini biasanya diperlukan jika Anda menggunakantf.distribute
to scale prediksi. Namun Anda dapat menyisipkan indeks untuk setiap elemen dalam batch dan mengurutkan keluarannya sesuai dengan itu. Cuplikan berikut adalah contoh cara mengurutkan keluaran.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
Bagaimana cara mendistribusikan data saya jika saya tidak menggunakan instance canonical tf.data.Dataset?
Terkadang pengguna tidak dapat menggunakantf.data.Dataset
untuk mewakili masukan mereka dan selanjutnya API yang disebutkan di atas untuk mendistribusikantf.data.Dataset
data ke beberapa perangkat. Dalam kasus seperti itu, Anda dapat menggunakan tensor atau input mentah dari generator.
Gunakan fungsi_distribusi_perimental_fungsi_perimental untuk input tensor arbitrer
strategy.run
menerima tf.distribute.DistributedValues
yang merupakan output dari next(iterator)
. Untuk meneruskan nilai tensor, gunakan experimental_distribute_values_from_function
untuk membuat tf.distribute.DistributedValues
dari tensor mentah.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Gunakan tf.data.Dataset.from_generator jika masukan Anda dari generator
Jika Anda memiliki fungsi generator yang ingin digunakan, Anda dapat membuat instancetf.data.Dataset
menggunakan API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)