This page was translated by the Cloud Translation API.
Switch to English

Formazione distribuita con TensorFlow

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza sorgente su GitHub Scarica il quaderno

Panoramica

tf.distribute.Strategy è un'API TensorFlow per distribuire la formazione su più GPU, più macchine o TPU. Utilizzando questa API, è possibile distribuire i modelli esistenti e il codice di training con modifiche minime al codice.

tf.distribute.Strategy è stato progettato tf.distribute.Strategy a questi obiettivi chiave:

  • Facile da usare e supportare più segmenti di utenti, inclusi ricercatori, ingegneri ML, ecc.
  • Fornire buone prestazioni fuori dalla scatola.
  • Facile passaggio tra le strategie.

tf.distribute.Strategy può essere utilizzato con un'API di alto livello come Keras e può anche essere utilizzato per distribuire cicli di addestramento personalizzati (e, in generale, qualsiasi calcolo che utilizza TensorFlow).

In TensorFlow 2.x, puoi eseguire i tuoi programmi con entusiasmo o in un grafico usando tf.function . tf.distribute.Strategy intende supportare entrambe queste modalità di esecuzione, ma funziona meglio con tf.function . La modalità Eager è consigliata solo a scopo di debug e non supportata per TPUStrategy . Sebbene discutiamo di formazione per la maggior parte del tempo in questa guida, questa API può essere utilizzata anche per distribuire valutazioni e previsioni su piattaforme diverse.

Puoi usare tf.distribute.Strategy con pochissime modifiche al tuo codice, perché abbiamo modificato i componenti sottostanti di TensorFlow per renderlo consapevole della strategia. Ciò include variabili, livelli, modelli, ottimizzatori, metriche, riepiloghi e punti di controllo.

In questa guida, spieghiamo vari tipi di strategie e come è possibile utilizzarle in diverse situazioni.

 # Import TensorFlow
import tensorflow as tf
 

Tipi di strategie

tf.distribute.Strategy intende coprire una serie di casi d'uso lungo assi diversi. Alcune di queste combinazioni sono attualmente supportate e altre verranno aggiunte in futuro. Alcuni di questi assi sono:

  • Formazione sincrona vs asincrona: questi sono due modi comuni di distribuire la formazione con il parallelismo dei dati. Nella formazione di sincronizzazione, tutti i lavoratori si allenano su diverse sezioni di dati di input in sincronia e aggregano i gradienti in ogni fase. Nell'addestramento asincrono, tutti i lavoratori si allenano indipendentemente sui dati di input e aggiornano le variabili in modo asincrono. In genere, l'addestramento di sincronizzazione è supportato tramite all-reduce e asincrono tramite l'architettura del server dei parametri.
  • Piattaforma hardware: potresti voler ridimensionare il tuo allenamento su più GPU su una macchina o su più macchine in una rete (con 0 o più GPU ciascuna) o su TPU Cloud.

Per supportare questi casi d'uso, sono disponibili sei strategie. Nella prossima sezione spiegheremo quali di questi sono supportati in quali scenari in TF 2.2 in questo momento. Ecco una rapida panoramica:

API di formazione MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API di Keras supportato supportato Supporto sperimentale Supporto sperimentale Post programmato supportato 2.3
Ciclo di allenamento personalizzato supportato supportato Supporto sperimentale Supporto sperimentale Post programmato supportato 2.3
API di stima Supporto limitato Non supportato Supporto limitato Supporto limitato Supporto limitato

MirroredStrategy

tf.distribute.MirroredStrategy supporta l'addestramento sincrono distribuito su più GPU su una macchina. Crea una replica per dispositivo GPU. Ogni variabile nel modello è speculare su tutte le repliche. Insieme, queste variabili formano un'unica variabile concettuale chiamata MirroredVariable . Queste variabili sono sincronizzate tra loro applicando aggiornamenti identici.

Algoritmi efficienti di riduzione ridotta vengono utilizzati per comunicare gli aggiornamenti variabili tra i dispositivi. Tensori aggregati a riduzione totale su tutti i dispositivi aggiungendoli e rendendoli disponibili su ciascun dispositivo. È un algoritmo fuso che è molto efficiente e può ridurre significativamente l'overhead della sincronizzazione. Sono disponibili molti algoritmi e implementazioni a riduzione ridotta, a seconda del tipo di comunicazione disponibile tra i dispositivi. Per impostazione predefinita, utilizza NVIDIA NCCL come implementazione a riduzione totale. Puoi scegliere tra alcune altre opzioni che forniamo o scrivere le tue.

Ecco il modo più semplice di creare MirroredStrategy :

 mirrored_strategy = tf.distribute.MirroredStrategy()
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Ciò creerà un'istanza di MirroredStrategy che utilizzerà tutte le GPU visibili a TensorFlow e utilizzerà NCCL come comunicazione tra dispositivi.

Se desideri utilizzare solo alcune delle GPU sul tuo computer, puoi farlo in questo modo:

 mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
 
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Se si desidera sovrascrivere la comunicazione tra dispositivi, è possibile farlo utilizzando l'argomento cross_device_ops fornendo un'istanza di tf.distribute.CrossDeviceOps . Attualmente, tf.distribute.HierarchicalCopyAllReduce e tf.distribute.ReductionToOneDevice sono due opzioni diverse da tf.distribute.NcclAllReduce che è l'impostazione predefinita.

 mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

tf.distribute.TPUStrategy ti consente di eseguire il tuo training TensorFlow su unità di elaborazione tensoriale (TPU). I TPU sono ASIC specializzati di Google progettati per accelerare notevolmente i carichi di lavoro di apprendimento automatico. Sono disponibili su Google Colab, TensorFlow Research Cloud e Cloud TPU .

In termini di architettura di formazione distribuita, TPUStrategy è la stessa MirroredStrategy : implementa la formazione distribuita sincrona. I TPU forniscono la propria implementazione di operazioni all-riducenti efficienti e altre operazioni collettive su più core TPU, che vengono utilizzati in TPUStrategy .

Ecco come TPUStrategy un'istanza di TPUStrategy :

 cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
 

L'istanza TPUClusterResolver aiuta a individuare i TPU. In Colab, non è necessario specificare alcun argomento.

Se si desidera utilizzare questo per TPU cloud:

  • È necessario specificare il nome della risorsa TPU nell'argomento tpu .
  • È necessario inizializzare esplicitamente il sistema tpu all'inizio del programma. Ciò è necessario prima che le TPU possano essere utilizzate per il calcolo. L'inizializzazione del sistema tpu cancella anche la memoria TPU, quindi è importante completare prima questo passaggio per evitare di perdere stato.

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy è molto simile a MirroredStrategy . Implementa la formazione distribuita sincrona tra più lavoratori, ciascuno con GPU potenzialmente multiple. Simile a MirroredStrategy , crea copie di tutte le variabili nel modello su ciascun dispositivo tra tutti i lavoratori.

Utilizza CollectiveOps come metodo di comunicazione multi-lavoratore multiuso utilizzato per mantenere sincronizzate le variabili. Un'operazione collettiva è una singola operazione nel grafico TensorFlow che può scegliere automaticamente un algoritmo di riduzione totale nel runtime di TensorFlow in base all'hardware, alla topologia di rete e alle dimensioni del tensore.

Implementa inoltre ulteriori ottimizzazioni delle prestazioni. Ad esempio, include un'ottimizzazione statica che converte molteplici riduzioni su tensori piccoli in meno riduzioni su tensori più grandi. Inoltre, lo stiamo progettando per avere un'architettura plug-in, in modo che in futuro sarai in grado di creare algoritmi plug-in che siano ottimizzati per il tuo hardware. Si noti che le operazioni collettive implementano anche altre operazioni collettive come la trasmissione e il raduno generale.

Ecco il modo più semplice di creare MultiWorkerMirroredStrategy :

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy attualmente consente di scegliere tra due diverse implementazioni di operazioni collettive. CollectiveCommunication.RING implementa collettivi basati su anello usando gRPC come livello di comunicazione. CollectiveCommunication.NCCL utilizza NCCL di Nvidia per implementare i collettivi. CollectiveCommunication.AUTO difende la scelta in fase di esecuzione. La scelta migliore dell'implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster. Puoi specificarli nel modo seguente:

 multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

Una delle differenze chiave per avviare la formazione multi-lavoratore, rispetto alla formazione multi-GPU, è la configurazione multi-lavoratore. La variabile d'ambiente TF_CONFIG è il modo standard in TensorFlow per specificare la configurazione del cluster per ciascun lavoratore che fa parte del cluster. Ulteriori informazioni sulla configurazione di TF_CONFIG .

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy fa anche un training sincrono. Le variabili non sono speculari, ma vengono posizionate sulla CPU e le operazioni vengono replicate su tutte le GPU locali. Se esiste una sola GPU, tutte le variabili e le operazioni verranno posizionate su quella GPU.

Creare un'istanza di CentralStorageStrategy mediante:

 central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
 
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Ciò creerà un'istanza CentralStorageStrategy che utilizzerà tutte le GPU e CPU visibili. L'aggiornamento alle variabili sulle repliche verrà aggregato prima di essere applicato alle variabili.

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy supporta l'addestramento di server di parametri su più macchine. In questa configurazione, alcune macchine sono designate come worker e altre come server dei parametri. Ogni variabile del modello viene posizionata su un parametro server. Il calcolo viene replicato su tutte le GPU di tutti i lavoratori.

In termini di codice, sembra simile ad altre strategie:

 ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
 

Per la formazione multi-lavoratore, TF_CONFIG deve specificare la configurazione dei server dei parametri e dei lavoratori nel cluster, di cui è possibile leggere ulteriori informazioni in TF_CONFIG di seguito .

Altre strategie

Oltre alle strategie di cui sopra, ci sono altre due strategie che potrebbero essere utili per la prototipazione e il debug quando si utilizzano API tf.distribute .

Strategia predefinita

La strategia di default è una strategia di distribuzione che è presente quando nessuna strategia di distribuzione esplicita è nell'ambito. Implementa l'interfaccia tf.distribute.Strategy ma è un pass-through e non fornisce alcuna distribuzione effettiva. Ad esempio, strategy.run(fn) chiamerà semplicemente fn . Il codice scritto usando questa strategia dovrebbe comportarsi esattamente come il codice scritto senza alcuna strategia. Puoi pensarlo come una strategia "no-op".

La strategia predefinita è un singleton e non è possibile crearne più istanze. Può essere ottenuto utilizzando tf.distribute.get_strategy() al di fuori di qualsiasi ambito di strategia esplicita (la stessa API che può essere utilizzata per ottenere la strategia corrente all'interno di un ambito di strategia esplicita).

 default_strategy = tf.distribute.get_strategy()
 

Questa strategia ha due scopi principali:

  • Consente la scrittura incondizionata del codice della libreria consapevole della distribuzione. Ad esempio, in Optimizer, possiamo fare tf.distribute.get_strategy() e usare quella strategia per ridurre i gradienti: restituirà sempre un oggetto strategia su cui possiamo chiamare l'API di riduzione.
 # In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
 
1.0
  • Simile al codice della libreria, può essere utilizzato per scrivere i programmi degli utenti finali per lavorare con e senza strategia di distribuzione, senza richiedere una logica condizionale. Un frammento di codice di esempio che illustra questo:
 if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
 
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy è una strategia per posizionare tutte le variabili e il calcolo su un singolo dispositivo specificato.

 strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
 

Questa strategia è distinta dalla strategia predefinita in vari modi. Nella strategia predefinita, la logica di posizionamento variabile rimane invariata rispetto all'esecuzione di TensorFlow senza alcuna strategia di distribuzione. Ma quando si utilizza OneDeviceStrategy , tutte le variabili create nel suo ambito vengono esplicitamente posizionate sul dispositivo specificato. Inoltre, tutte le funzioni chiamate tramite OneDeviceStrategy.run verranno posizionate sul dispositivo specificato.

Gli input distribuiti tramite questa strategia verranno precaricati sul dispositivo specificato. Nella strategia predefinita, non esiste una distribuzione di input.

Simile alla strategia predefinita, questa strategia potrebbe anche essere utilizzata per testare il codice prima di passare ad altre strategie che si distribuiscono effettivamente su più dispositivi / macchine. Questo eserciterà il meccanismo della strategia di distribuzione in qualche modo più della strategia predefinita, ma non nella misura in cui si utilizza MirroredStrategy o TPUStrategy ecc. Se si desidera un codice che si comporta come se non esistesse una strategia, utilizzare la strategia predefinita.

Finora abbiamo parlato di quali sono le diverse strategie disponibili e come è possibile istanziarle. Nelle prossime sezioni parleremo dei diversi modi in cui puoi usarli per distribuire il tuo allenamento. Mostreremo frammenti di codice breve in questa guida e rimanderemo a tutorial completi che puoi eseguire end-to-end.

Utilizzo di tf.distribute.Strategy con tf.keras.Model.fit

Abbiamo integrato tf.distribute.Strategy in tf.keras che è l'implementazione di TensorFlow della specifica dell'API di Keras . tf.keras è tf.keras alto livello per costruire e formare modelli. Integrando nel back tf.keras end di tf.keras , abbiamo reso semplice la distribuzione dei tuoi allenamenti scritti nel framework di allenamento di Keras usando model.fit .

Ecco cosa devi modificare nel tuo codice:

  1. Creare un'istanza del tf.distribute.Strategy appropriato.
  2. Sposta la creazione del modello, dell'ottimizzatore e delle metriche di Keras all'interno di strategy.scope .

Supportiamo tutti i tipi di modelli Keras: sequenziali, funzionali e in sottoclasse.

Ecco uno snippet di codice per fare questo per un modello Keras molto semplice con un livello denso:

 mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

In questo esempio abbiamo usato MirroredStrategy modo da poterlo eseguire su una macchina con più GPU. strategy.scope() indica a Keras quale strategia utilizzare per distribuire l'addestramento. La creazione di modelli / ottimizzatori / metriche all'interno di questo ambito ci consente di creare variabili distribuite anziché variabili normali. Una volta impostato, puoi adattare il tuo modello come faresti normalmente. MirroredStrategy si occupa di replicare la formazione del modello sulle GPU disponibili, aggregando i gradienti e altro.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
 
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.0035
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.4436
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 1ms/step - loss: 0.2755

0.27546340227127075

Qui abbiamo usato un file tf.data.Dataset per fornire la formazione e l'input di tf.data.Dataset . È inoltre possibile utilizzare matrici intorpidite:

 import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
 
Epoch 1/2
10/10 [==============================] - 0s 1ms/step - loss: 0.1961
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.0867

<tensorflow.python.keras.callbacks.History at 0x7fa17808e240>

In entrambi i casi (set di dati o numpy), ciascun batch dell'input specificato viene diviso equamente tra le repliche multiple. Ad esempio, se si utilizza MirroredStrategy con 2 GPU, ogni batch di dimensioni 10 verrà diviso tra le 2 GPU, ciascuna delle quali riceverà 5 esempi di input in ogni passaggio. Ogni epoca si allenerà più velocemente quando si aggiungono più GPU. In genere, si desidera aumentare le dimensioni del batch quando si aggiungono più acceleratori in modo da sfruttare efficacemente la potenza di elaborazione aggiuntiva. Dovrai anche risintonizzare il tuo tasso di apprendimento, a seconda del modello. È possibile utilizzare strategy.num_replicas_in_sync per ottenere il numero di repliche.

 # Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
 

Cosa è supportato ora?

API di formazione MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API di Keras supportato supportato Supporto sperimentale Supporto sperimentale Supporto post programmato 2.3

Esempi ed esercitazioni

Ecco un elenco di tutorial ed esempi che illustrano l'integrazione di cui sopra end to end con Keras:

  1. Tutorial per allenare MNIST con MirroredStrategy .
  2. Tutorial per addestrare MNIST utilizzando MultiWorkerMirroredStrategy .
  3. Guida alla formazione di MNIST tramite TPUStrategy .
  4. Repository TensorFlow Model Garden contenente raccolte di modelli all'avanguardia implementati utilizzando varie strategie.

Utilizzo di tf.distribute.Strategy con cicli di allenamento personalizzati

Come hai visto, usare tf.distribute.Strategy con Keras model.fit richiede di modificare solo un paio di righe del codice. Con un po 'più di sforzo, puoi anche usare tf.distribute.Strategy con cicli di allenamento personalizzati.

Se hai bisogno di maggiore flessibilità e controllo sui tuoi cicli di allenamento di quanto sia possibile con Estimator o Keras, puoi scrivere cicli di allenamento personalizzati. Ad esempio, quando si utilizza un GAN, è possibile che si desideri eseguire un numero diverso di fasi generatore o discriminatore ogni round. Allo stesso modo, i quadri di alto livello non sono molto adatti per la formazione sull'apprendimento per rinforzo.

Per supportare i cicli di formazione personalizzati, forniamo un set di metodi di base tramite le classi tf.distribute.Strategy . L'uso di questi potrebbe richiedere inizialmente una piccola ristrutturazione del codice, ma una volta fatto ciò, dovresti essere in grado di passare da GPU, TPU e più macchine semplicemente cambiando l'istanza della strategia.

Qui mostreremo un breve frammento che illustra questo caso d'uso per un semplice esempio di allenamento usando lo stesso modello Keras di prima.

Innanzitutto, creiamo il modello e l'ottimizzatore nell'ambito della strategia. Ciò garantisce che tutte le variabili create con il modello e l'ottimizzatore siano variabili speculari.

 with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()
 

Successivamente, creiamo il set di dati di input e chiamiamo tf.distribute.Strategy.experimental_distribute_dataset per distribuire il set di dati in base alla strategia.

 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
 

Quindi, definiamo un passaggio della formazione. Useremo tf.GradientTape per calcolare i gradienti e l'ottimizzatore per applicare quei gradienti per aggiornare le variabili del nostro modello. Per distribuire questo passaggio di formazione, inseriamo una funzione train_step e la passiamo a tf.distrbute.Strategy.run insieme agli input del set di dati che otteniamo da dist_dataset creato prima:

 loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
 

Alcune altre cose da notare nel codice sopra:

  1. Abbiamo usato tf.nn.compute_average_loss per calcolare la perdita. tf.nn.compute_average_loss somma la perdita per esempio e divide la somma per global_batch_size. Questo è importante perché in seguito, dopo aver calcolato i gradienti su ciascuna replica, vengono aggregati attraverso le repliche sommandoli .
  2. Abbiamo usato l'API tf.distribute.Strategy.reduce per aggregare i risultati restituiti da tf.distribute.Strategy.run . tf.distribute.Strategy.run restituisce risultati da ciascuna replica locale nella strategia e esistono diversi modi per utilizzare questo risultato. Puoi reduce per ottenere un valore aggregato. Puoi anche fare tf.distribute.Strategy.experimental_local_results per ottenere l'elenco dei valori contenuti nel risultato, uno per replica locale.
  3. Quando si apply_gradients nell'ambito di una strategia di distribuzione, il suo comportamento viene modificato. In particolare, prima di applicare i gradienti su ciascuna istanza parallela durante l'allenamento sincrono, esegue una somma totale delle repliche dei gradienti.

Infine, una volta definito il passaggio di allenamento, possiamo iterare su dist_dataset ed eseguire l'allenamento in un ciclo:

 for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
 
tf.Tensor(0.4155251, shape=(), dtype=float32)
tf.Tensor(0.41321823, shape=(), dtype=float32)
tf.Tensor(0.4109319, shape=(), dtype=float32)
tf.Tensor(0.40866604, shape=(), dtype=float32)
tf.Tensor(0.40642032, shape=(), dtype=float32)
tf.Tensor(0.40419456, shape=(), dtype=float32)
tf.Tensor(0.4019885, shape=(), dtype=float32)
tf.Tensor(0.399802, shape=(), dtype=float32)
tf.Tensor(0.39763477, shape=(), dtype=float32)
tf.Tensor(0.3954866, shape=(), dtype=float32)
tf.Tensor(0.39335734, shape=(), dtype=float32)
tf.Tensor(0.3912467, shape=(), dtype=float32)
tf.Tensor(0.38915452, shape=(), dtype=float32)
tf.Tensor(0.38708064, shape=(), dtype=float32)
tf.Tensor(0.38502476, shape=(), dtype=float32)
tf.Tensor(0.38298675, shape=(), dtype=float32)
tf.Tensor(0.38096642, shape=(), dtype=float32)
tf.Tensor(0.3789635, shape=(), dtype=float32)
tf.Tensor(0.3769779, shape=(), dtype=float32)
tf.Tensor(0.37500936, shape=(), dtype=float32)

Nell'esempio sopra, abbiamo ripetuto il dist_dataset per fornire input al tuo allenamento. Forniamo anche tf.distribute.Strategy.make_experimental_numpy_dataset per supportare input numpy. È possibile utilizzare questa API per creare un set di dati prima di chiamare tf.distribute.Strategy.experimental_distribute_dataset .

Un altro modo di scorrere i dati è utilizzare esplicitamente gli iteratori. È possibile che si desideri eseguire ciò quando si desidera eseguire un determinato numero di passaggi anziché eseguire iterazioni sull'intero set di dati. L'iterazione di cui sopra verrebbe ora modificata per creare prima un iteratore e quindi chiamare esplicitamente il next per ottenere i dati di input.

 iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
 
tf.Tensor(0.37305772, shape=(), dtype=float32)
tf.Tensor(0.3711228, shape=(), dtype=float32)
tf.Tensor(0.3692044, shape=(), dtype=float32)
tf.Tensor(0.36730233, shape=(), dtype=float32)
tf.Tensor(0.3654165, shape=(), dtype=float32)
tf.Tensor(0.36354658, shape=(), dtype=float32)
tf.Tensor(0.36169255, shape=(), dtype=float32)
tf.Tensor(0.3598542, shape=(), dtype=float32)
tf.Tensor(0.35803124, shape=(), dtype=float32)
tf.Tensor(0.3562236, shape=(), dtype=float32)

Questo copre il caso più semplice di utilizzo dell'API tf.distribute.Strategy per distribuire cicli di addestramento personalizzati. Stiamo migliorando queste API. Poiché questo caso d'uso richiede più lavoro per adattare il codice, pubblicheremo in futuro una guida dettagliata separata.

Cosa è supportato ora?

API di formazione MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Ciclo di allenamento personalizzato supportato supportato Supporto sperimentale Supporto sperimentale Supporto post programmato 2.3

Esempi ed esercitazioni

Ecco alcuni esempi per l'utilizzo della strategia di distribuzione con cicli di formazione personalizzati:

  1. Tutorial per addestrare MNIST usando MirroredStrategy .
  2. Guida alla formazione di MNIST tramite TPUStrategy .
  3. Repository TensorFlow Model Garden contenente raccolte di modelli all'avanguardia implementati utilizzando varie strategie.

Utilizzo di tf.distribute.Strategy con Estimator (supporto limitato)

tf.estimator è un'API TensorFlow di formazione distribuita che originariamente supportava l'approccio asincrono dei parametri del server. Come con Keras, abbiamo integrato tf.distribute.Strategy in tf.Estimator . Se stai utilizzando Estimator per il tuo allenamento, puoi facilmente passare al training distribuito con pochissime modifiche al tuo codice. Con questo, gli utenti di Estimator possono ora eseguire formazione distribuita sincrona su più GPU e più lavoratori, nonché utilizzare TPU. Questo supporto in Estimator è, tuttavia, limitato. Vedi la sezione Cosa è supportato ora di seguito per maggiori dettagli.

L'uso di tf.distribute.Strategy con Estimator è leggermente diverso dal caso Keras. Invece di usare strategy.scope , ora passiamo l'oggetto strategia in RunConfig per Estimator.

Ecco uno snippet di codice che lo mostra con uno stimatore LinearRegressor e MirroredStrategy LinearRegressor :

 mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmp2ack9oru
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmp2ack9oru', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7fa124522b38>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

Qui utilizziamo uno stimatore premade, ma lo stesso codice funziona anche con uno stimatore personalizzato. train_distribute determina come sarà distribuita la formazione e eval_distribute determina come verrà distribuita la valutazione. Questa è un'altra differenza rispetto a Keras, dove utilizziamo la stessa strategia sia per l'allenamento che per la valutazione.

Ora possiamo addestrare e valutare questo stimatore con una funzione di input:

 def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
 
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa12452cb70> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmp2ack9oru/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7fa1e9768d08> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-08-04T20:28:12Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmp2ack9oru/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.20350s
INFO:tensorflow:Finished evaluation at 2020-08-04-20:28:12
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmp2ack9oru/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

Un'altra differenza da evidenziare qui tra Estimator e Keras è la gestione degli input. In Keras, abbiamo menzionato che ciascun batch del set di dati viene suddiviso automaticamente tra le repliche multiple. In Estimator, tuttavia, non eseguiamo la suddivisione automatica del batch, né suddividiamo automaticamente i dati tra diversi lavoratori. Hai il pieno controllo su come desideri che i tuoi dati vengano distribuiti tra lavoratori e dispositivi e devi fornire un input_fn per specificare come distribuire i tuoi dati.

Il tuo input_fn viene chiamato una volta per lavoratore, fornendo così un set di dati per lavoratore. Quindi un batch da quel set di dati viene inviato a una replica su quel lavoratore, consumando in tal modo N lotti per N repliche su 1 lavoratore. In altre parole, il set di dati restituito da input_fn dovrebbe fornire batch di dimensioni PER_REPLICA_BATCH_SIZE . E la dimensione globale del batch per un passaggio può essere ottenuta come PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync .

Durante la formazione multi-lavoratore, è necessario dividere i dati tra i lavoratori o mescolarli con un seme casuale su ciascuno. Puoi vedere un esempio di come eseguire ciò nella formazione multi-lavoratore con Estimator .

Allo stesso modo, è possibile utilizzare anche strategie multi-lavoratore e di parametro. Il codice rimane lo stesso, ma è necessario utilizzare tf.estimator.train_and_evaluate e impostare le variabili di ambiente TF_CONFIG per ciascun binario in esecuzione nel cluster.

Cosa è supportato ora?

C'è un supporto limitato per l'allenamento con Estimator usando tutte le strategie tranne TPUStrategy . La formazione e la valutazione di base dovrebbero funzionare, ma una serie di funzionalità avanzate come l'impalcatura non funziona ancora. In questa integrazione potrebbero esserci anche diversi bug. Al momento, non intendiamo migliorare attivamente questo supporto e ci concentriamo invece su Keras e sul supporto del ciclo di formazione personalizzato. Se possibile, è preferibile utilizzare tf.distribute con tali API.

API di formazione MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
API di stima Supporto limitato Non supportato Supporto limitato Supporto limitato Supporto limitato

Esempi ed esercitazioni

Ecco alcuni esempi che mostrano l'utilizzo end to end di varie strategie con Estimator:

  1. Formazione multi-lavoratore con Estimator per formare MNIST con più lavoratori utilizzando MultiWorkerMirroredStrategy .
  2. Esempio end-to-end per la formazione multi-lavoratore in tensorflow / ecosistema utilizzando i modelli Kubernetes. Questo esempio inizia con un modello Keras e lo converte in uno stimatore utilizzando l'API tf.keras.estimator.model_to_estimator .
  3. Modello ufficiale ResNet50 , che può essere addestrato utilizzando MirroredStrategy o MultiWorkerMirroredStrategy .

Altri argomenti

In questa sezione, tratteremo alcuni argomenti rilevanti per più casi d'uso.

Impostazione della variabile d'ambiente TF_CONFIG

Per la formazione multi-lavoratore, come accennato in precedenza, è necessario impostare la variabile d'ambiente TF_CONFIG per ciascun binario in esecuzione nel cluster. La variabile d'ambiente TF_CONFIG è una stringa JSON che specifica quali attività costituiscono un cluster, i loro indirizzi e il ruolo di ciascuna attività nel cluster. Forniamo un modello Kubernetes nel repository tensorflow / ecosistema che imposta TF_CONFIG per le attività di formazione.

Vi sono due componenti di TF_CONFIG: cluster e task. cluster fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavori come il lavoratore. Nella formazione multi-lavoratore, di solito c'è un lavoratore che si assume un po 'più di responsabilità come il salvataggio del checkpoint e la scrittura del file di riepilogo per TensorBoard oltre a quello che fa un lavoratore normale. Tale lavoratore è indicato come lavoratore "principale" ed è consuetudine che il lavoratore con indice 0 sia nominato lavoratore principale (in effetti è così che viene implementato tf.distribute.Strategy). l'attività d'altra parte fornisce informazioni sull'attività corrente. Il primo cluster di componenti è uguale per tutti i lavoratori e il secondo compito del componente è diverso per ogni lavoratore e specifica il tipo e l'indice di quel lavoratore.

Un esempio di TF_CONFIG è:

 os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})
 

Questo TF_CONFIG specifica che ci sono tre lavoratori e due attività ps nel cluster insieme ai loro host e porte. La parte "task" specifica che il ruolo dell'attività corrente nel cluster, worker 1 (il secondo lavoratore). I ruoli validi in un cluster sono "capo", "lavoratore", "ps" e "valutatore". Non dovrebbe esserci lavoro "ps" se non quando si utilizza tf.distribute.experimental.ParameterServerStrategy .

Qual è il prossimo?

tf.distribute.Strategy è attivamente in fase di sviluppo. Ti invitiamo a provarlo e a fornirti il ​​tuo feedback utilizzando i problemi di GitHub .