Formazione distribuita con TensorFlow

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza l'origine su GitHub Scarica quaderno

Panoramica

tf.distribute.Strategy è un'API TensorFlow per distribuire la formazione su più GPU, più macchine o TPU. Utilizzando questa API, puoi distribuire i tuoi modelli esistenti e il codice di addestramento con modifiche minime al codice.

tf.distribute.Strategy è stato progettato tenendo presenti questi obiettivi chiave:

  • Facile da usare e supporta più segmenti di utenti, inclusi ricercatori, ingegneri di apprendimento automatico, ecc.
  • Fornire buone prestazioni fuori dagli schemi.
  • Facile passaggio da una strategia all'altra.

Puoi distribuire la formazione utilizzando tf.distribute.Strategy con un'API di alto livello come Keras Model.fit , nonché cicli di formazione personalizzati (e, in generale, qualsiasi calcolo utilizzando TensorFlow).

In TensorFlow 2.x, puoi eseguire i tuoi programmi avidamente o in un grafico usando tf.function . tf.distribute.Strategy intende supportare entrambe queste modalità di esecuzione, ma funziona meglio con tf.function . La modalità Eager è consigliata solo per scopi di debug e non è supportata per tf.distribute.TPUStrategy . Sebbene la formazione sia l'obiettivo di questa guida, questa API può essere utilizzata anche per distribuire valutazioni e previsioni su piattaforme diverse.

Puoi usare tf.distribute.Strategy con pochissime modifiche al tuo codice, perché i componenti sottostanti di TensorFlow sono stati modificati per diventare consapevoli della strategia. Ciò include variabili, livelli, modelli, ottimizzatori, metriche, riepiloghi e checkpoint.

In questa guida imparerai vari tipi di strategie e come utilizzarle in diverse situazioni. Per informazioni su come eseguire il debug dei problemi di prestazioni, consulta la guida alle prestazioni della GPU Ottimizzazione TensorFlow .

Imposta TensorFlow

import tensorflow as tf

Tipi di strategie

tf.distribute.Strategy intende coprire una serie di casi d'uso lungo diversi assi. Alcune di queste combinazioni sono attualmente supportate e altre verranno aggiunte in futuro. Alcuni di questi assi sono:

  • Addestramento sincrono e asincrono: questi sono due modi comuni per distribuire l'addestramento con il parallelismo dei dati. Nella formazione sulla sincronizzazione, tutti i lavoratori si allenano su diverse sezioni di dati di input sincronizzati e aggregando i gradienti a ogni passaggio. Nella formazione asincrona, tutti i lavoratori si addestrano in modo indipendente sui dati di input e aggiornano le variabili in modo asincrono. In genere il training di sincronizzazione è supportato tramite all-reduce e async tramite l'architettura del server dei parametri.
  • Piattaforma hardware: potresti voler ridimensionare la tua formazione su più GPU su una macchina o su più macchine in una rete (con 0 o più GPU ciascuna) o su Cloud TPU.

Per supportare questi casi d'uso, TensorFlow ha MirroredStrategy , TPUStrategy , MultiWorkerMirroredStrategy , ParameterServerStrategy , CentralStorageStrategy e altre strategie disponibili. La sezione successiva spiega quali di questi sono supportati in quali scenari in TensorFlow. Ecco una rapida panoramica:

API di formazione MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Model.fit Supportato Supportato Supportato Supporto sperimentale Supporto sperimentale
Ciclo di allenamento personalizzato Supportato Supportato Supportato Supporto sperimentale Supporto sperimentale
API di stima Supporto limitato Non supportato Supporto limitato Supporto limitato Supporto limitato

MirroredStrategy

tf.distribute.MirroredStrategy supporta l'addestramento distribuito sincrono su più GPU su una macchina. Crea una replica per dispositivo GPU. Ogni variabile nel modello viene rispecchiata in tutte le repliche. Insieme, queste variabili formano un'unica variabile concettuale denominata MirroredVariable . Queste variabili vengono mantenute sincronizzate tra loro applicando aggiornamenti identici.

Efficienti algoritmi di riduzione totale vengono utilizzati per comunicare gli aggiornamenti delle variabili tra i dispositivi. All-reduce aggrega i tensori su tutti i dispositivi sommandoli e rendendoli disponibili su ciascun dispositivo. È un algoritmo fuso che è molto efficiente e può ridurre significativamente il sovraccarico della sincronizzazione. Sono disponibili molti algoritmi e implementazioni all-reduce, a seconda del tipo di comunicazione disponibile tra i dispositivi. Per impostazione predefinita, utilizza la NVIDIA Collective Communication Library ( NCCL ) come implementazione all-reduce. Puoi scegliere tra alcune altre opzioni o scriverne una tua.

Ecco il modo più semplice per creare MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Questo creerà un'istanza MirroredStrategy , che utilizzerà tutte le GPU visibili a TensorFlow e NCCL, come comunicazione tra dispositivi.

Se desideri utilizzare solo alcune delle GPU sulla tua macchina, puoi farlo in questo modo:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Se desideri ignorare la comunicazione tra dispositivi, puoi farlo utilizzando l'argomento cross_device_ops fornendo un'istanza di tf.distribute.CrossDeviceOps . Attualmente, tf.distribute.HierarchicalCopyAllReduce e tf.distribute.ReductionToOneDevice sono due opzioni diverse da tf.distribute.NcclAllReduce , che è l'impostazione predefinita.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategia

tf.distribute.TPUStrategy ti consente di eseguire il tuo training TensorFlow su Tensor Processing Unit (TPU) . I TPU sono gli ASIC specializzati di Google progettati per accelerare notevolmente i carichi di lavoro di machine learning. Sono disponibili su Google Colab , TPU Research Cloud e Cloud TPU .

In termini di architettura di formazione distribuita, TPUStrategy è la stessa MirroredStrategy : implementa la formazione distribuita sincrona. Le TPU forniscono la propria implementazione di efficienti operazioni di riduzione totale e altre operazioni collettive su più core TPU, che vengono utilizzati in TPUStrategy .

Ecco come istanziare TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

L'istanza TPUClusterResolver aiuta a individuare le TPU. In Colab, non è necessario specificare alcun argomento.

Se vuoi usarlo per le Cloud TPU:

  • Devi specificare il nome della tua risorsa TPU nell'argomento tpu .
  • È necessario inizializzare il sistema TPU in modo esplicito all'avvio del programma. Ciò è necessario prima che le TPU possano essere utilizzate per il calcolo. L'inizializzazione del sistema TPU cancella anche la memoria TPU, quindi è importante completare prima questo passaggio per evitare di perdere lo stato.

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy è molto simile a MirroredStrategy . Implementa la formazione distribuita sincrona tra più lavoratori, ciascuno con potenzialmente più GPU. Simile a tf.distribute.MirroredStrategy , crea copie di tutte le variabili nel modello su ogni dispositivo in tutti i lavoratori.

Ecco il modo più semplice per creare MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy ha due implementazioni per le comunicazioni tra dispositivi. CommunicationImplementation.RING è basato su RPC e supporta sia CPU che GPU. CommunicationImplementation.NCCL utilizza NCCL e fornisce prestazioni all'avanguardia sulle GPU ma non supporta le CPU. CollectiveCommunication.AUTO rinvia la scelta a Tensorflow. Puoi specificarli nel modo seguente:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Una delle differenze chiave per avviare la formazione multi-operatore, rispetto alla formazione multi-GPU, è la configurazione multi-lavoratore. La variabile di ambiente 'TF_CONFIG' è il modo standard in TensorFlow per specificare la configurazione del cluster per ogni lavoratore che fa parte del cluster. Scopri di più nella sezione relativa alla configurazione di TF_CONFIG di questo documento.

Per maggiori dettagli su MultiWorkerMirroredStrategy , considera i seguenti tutorial:

ParameterServerStrategy

L'addestramento del server dei parametri è un metodo parallelo ai dati comune per aumentare l'addestramento del modello su più macchine. Un cluster di formazione del server dei parametri è costituito da lavoratori e server dei parametri. Le variabili vengono create sui server dei parametri e vengono lette e aggiornate dagli operatori in ogni passaggio. Per i dettagli, consulta l'esercitazione di addestramento del server dei parametri .

In TensorFlow 2, la formazione del server dei parametri utilizza un'architettura basata su coordinatore centrale tramite la classe tf.distribute.experimental.coordinator.ClusterCoordinator .

In questa implementazione, le attività del server di worker e parameter server eseguono tf.distribute.Server che ascoltano le attività dal coordinatore. Il coordinatore crea risorse, invia attività di formazione, scrive checkpoint e si occupa degli errori delle attività.

Nella programmazione in esecuzione sul coordinatore, utilizzerai un oggetto ParameterServerStrategy per definire una fase di formazione e utilizzerai un ClusterCoordinator per inviare le fasi di formazione ai lavoratori remoti. Ecco il modo più semplice per crearli:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

Per ulteriori informazioni su ParameterServerStrategy , consulta l'esercitazione sul server dei parametri con Keras Model.fit e un'esercitazione sul ciclo di addestramento personalizzato .

In TensorFlow 1, ParameterServerStrategy è disponibile solo con uno Estimator tramite il simbolo tf.compat.v1.distribute.experimental.ParameterServerStrategy .

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy esegue anche il training sincrono. Le variabili non vengono replicate, ma vengono posizionate sulla CPU e le operazioni vengono replicate su tutte le GPU locali. Se è presente una sola GPU, tutte le variabili e le operazioni verranno posizionate su quella GPU.

Crea un'istanza di CentralStorageStrategy tramite:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Questo creerà un'istanza di CentralStorageStrategy che utilizzerà tutte le GPU e CPU visibili. L'aggiornamento alle variabili sulle repliche verrà aggregato prima di essere applicato alle variabili.

Altre strategie

Oltre alle strategie di cui sopra, ci sono altre due strategie che potrebbero essere utili per la prototipazione e il debug quando si utilizzano le API tf.distribute .

Strategia predefinita

La strategia predefinita è una strategia di distribuzione che è presente quando non è prevista alcuna strategia di distribuzione esplicita. Implementa l'interfaccia tf.distribute.Strategy ma è un pass-through e non fornisce alcuna distribuzione effettiva. Ad esempio, Strategy.run(fn) chiamerà semplicemente fn . Il codice scritto utilizzando questa strategia dovrebbe comportarsi esattamente come il codice scritto senza alcuna strategia. Puoi pensarla come una strategia "no-op".

La strategia predefinita è un singleton e non è possibile crearne più istanze. Può essere ottenuto utilizzando tf.distribute.get_strategy al di fuori dell'ambito di qualsiasi strategia esplicita (la stessa API che può essere utilizzata per ottenere la strategia corrente all'interno dell'ambito di una strategia esplicita).

default_strategy = tf.distribute.get_strategy()

Questa strategia ha due scopi principali:

  • Consente di scrivere incondizionatamente codice di libreria sensibile alla distribuzione. Ad esempio, in tf.optimizer s puoi usare tf.distribute.get_strategy e usare quella strategia per ridurre i gradienti: restituirà sempre un oggetto strategia su cui puoi chiamare l'API Strategy.reduce .
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Simile al codice della libreria, può essere utilizzato per scrivere i programmi degli utenti finali per lavorare con e senza la strategia di distribuzione, senza richiedere la logica condizionale. Ecco un frammento di codice di esempio che illustra questo:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategy è una strategia per posizionare tutte le variabili e il calcolo su un singolo dispositivo specificato.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Questa strategia è distinta dalla strategia predefinita in diversi modi. Nella strategia predefinita, la logica di posizionamento delle variabili rimane invariata rispetto all'esecuzione di TensorFlow senza alcuna strategia di distribuzione. Ma quando si usa OneDeviceStrategy , tutte le variabili create nel suo ambito vengono posizionate in modo esplicito nel dispositivo specificato. Inoltre, tutte le funzioni richiamate tramite OneDeviceStrategy.run verranno posizionate anche sul dispositivo specificato.

L'input distribuito attraverso questa strategia verrà precaricato sul dispositivo specificato. Nella strategia predefinita, non esiste una distribuzione dell'input.

Simile alla strategia predefinita, questa strategia può essere utilizzata anche per testare il codice prima di passare ad altre strategie che effettivamente distribuiscono su più dispositivi/macchine. Ciò eserciterà il meccanismo della strategia di distribuzione un po' più della strategia predefinita, ma non nella misura massima dell'utilizzo, ad esempio, di MirroredStrategy o TPUStrategy . Se si desidera un codice che si comporti come se non esistesse alcuna strategia, utilizzare la strategia predefinita.

Finora hai imparato diverse strategie e come crearne un'istanza. Le prossime sezioni mostrano i diversi modi in cui puoi usarle per distribuire la tua formazione.

Usa tf.distribute.Strategy con Keras Model.fit

tf.distribute.Strategy è integrato in tf.keras , che è l'implementazione di TensorFlow della specifica API Keras . tf.keras è un'API di alto livello per creare e addestrare modelli. Grazie all'integrazione nel backend tf.keras , è possibile distribuire senza problemi la formazione scritta nel framework di formazione Keras utilizzando Model.fit .

Ecco cosa devi modificare nel tuo codice:

  1. Creare un'istanza dell'appropriato tf.distribute.Strategy .
  2. Sposta la creazione del modello Keras, dell'ottimizzatore e delle metriche all'interno strategy.scope .

Le strategie di distribuzione TensorFlow supportano tutti i tipi di modelli Keras: sequenziale , funzionale e sottoclasse .

Ecco un frammento di codice per farlo per un modello Keras molto semplice con un livello Dense :

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

Questo esempio usa MirroredStrategy , quindi puoi eseguirlo su una macchina con più GPU. strategy.scope() indica a Keras quale strategia utilizzare per distribuire la formazione. La creazione di modelli/ottimizzatori/metriche all'interno di questo ambito consente di creare variabili distribuite anziché variabili normali. Una volta impostato, puoi adattare il tuo modello come faresti normalmente. MirroredStrategy si occupa di replicare il training del modello sulle GPU disponibili, aggregando gradienti e altro.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-10-26 01:27:56.527729: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 2.2552
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.9968
2021-10-26 01:27:59.372113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.6190
0.6190494298934937

Qui un tf.data.Dataset fornisce l'input di addestramento e valutazione. Puoi anche usare gli array NumPy:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
2021-10-26 01:28:00.609977: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
10/10 [==============================] - 1s 2ms/step - loss: 0.4406
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1947
<keras.callbacks.History at 0x7fb81813d2d0>

In entrambi i casi, con Dataset o NumPy, ogni batch dell'input specificato viene diviso equamente tra le repliche multiple. Ad esempio, se stai utilizzando MirroredStrategy con 2 GPU, ogni batch di dimensione 10 verrà diviso tra le 2 GPU, ciascuna delle quali riceverà 5 esempi di input in ogni passaggio. Ogni epoca si allenerà quindi più velocemente man mano che aggiungi più GPU. In genere, si desidera aumentare le dimensioni del batch man mano che si aggiungono più acceleratori, in modo da utilizzare in modo efficace la potenza di calcolo aggiuntiva. Dovrai anche risintonizzare il tuo tasso di apprendimento, a seconda del modello. Puoi utilizzare strategy.num_replicas_in_sync per ottenere il numero di repliche.

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

Cosa è supportato ora?

API di formazione MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Model.fit Supportato Supportato Supportato Supporto sperimentale Supporto sperimentale

Esempi e tutorial

Ecco un elenco di tutorial ed esempi che illustrano l'integrazione end-to-end di cui sopra con Keras Model.fit :

  1. Tutorial : Allenamento con Model.fit e MirroredStrategy .
  2. Tutorial : Formazione con Model.fit e MultiWorkerMirroredStrategy .
  3. Guida : contiene un esempio di utilizzo di Model.fit e TPUStrategy .
  4. Esercitazione : Addestramento del server dei parametri con Model.fit e ParameterServerStrategy .
  5. Tutorial : perfezionamento di BERT per molte attività dal benchmark GLUE con Model.fit e TPUStrategy .
  6. Repository TensorFlow Model Garden contenente raccolte di modelli all'avanguardia implementati utilizzando varie strategie.

Usa tf.distribute.Strategy con cicli di formazione personalizzati

Come dimostrato sopra, l'utilizzo di tf.distribute.Strategy con Keras Model.fit richiede la modifica solo di un paio di righe del codice. Con un piccolo sforzo in più, puoi anche usare tf.distribute.Strategy con cicli di formazione personalizzati .

Se hai bisogno di maggiore flessibilità e controllo sui tuoi cicli di allenamento rispetto a quanto è possibile con Estimator o Keras, puoi scrivere cicli di allenamento personalizzati. Ad esempio, quando si utilizza un GAN, potresti voler eseguire un numero diverso di passaggi del generatore o del discriminatore ogni round. Allo stesso modo, i quadri di alto livello non sono molto adatti per la formazione sull'apprendimento per rinforzo.

Le classi tf.distribute.Strategy forniscono un set base di metodi per supportare cicli di formazione personalizzati. L'utilizzo di questi potrebbe richiedere inizialmente una piccola ristrutturazione del codice, ma una volta eseguita questa operazione, dovresti essere in grado di passare da GPU, TPU e più macchine semplicemente modificando l'istanza della strategia.

Di seguito è riportato un breve snippet che illustra questo caso d'uso per un semplice esempio di addestramento utilizzando lo stesso modello Keras di prima.

Innanzitutto, crea il modello e l'ottimizzatore all'interno dell'ambito della strategia. Ciò garantisce che tutte le variabili create con il modello e l'ottimizzatore siano variabili speculari.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

Quindi, crea il set di dati di input e chiama tf.distribute.Strategy.experimental_distribute_dataset per distribuire il set di dati in base alla strategia.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-10-26 01:28:01.831942: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Quindi, definire una fase della formazione. Usa tf.GradientTape per calcolare i gradienti e l'ottimizzatore per applicare quei gradienti per aggiornare le variabili del tuo modello. Per distribuire questo passaggio di addestramento, inseriscilo in una funzione train_step e passalo a tf.distribute.Strategy.run insieme agli input del set di dati che hai ottenuto dal dist_dataset creato in precedenza:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Alcune altre cose da notare nel codice sopra:

  1. Hai usato tf.nn.compute_average_loss per calcolare la perdita. tf.nn.compute_average_loss somma la perdita per esempio e divide la somma per global_batch_size . Questo è importante perché in seguito, dopo che i gradienti sono stati calcolati su ciascuna replica, vengono aggregati tra le repliche sommandoli .
  2. Hai anche utilizzato l'API tf.distribute.Strategy.reduce per aggregare i risultati restituiti da tf.distribute.Strategy.run . tf.distribute.Strategy.run restituisce i risultati di ogni replica locale nella strategia e sono disponibili diversi modi per utilizzare questo risultato. Puoi reduce per ottenere un valore aggregato. Puoi anche eseguire tf.distribute.Strategy.experimental_local_results per ottenere l'elenco dei valori contenuti nel risultato, uno per replica locale.
  3. Quando si chiama apply_gradients all'interno di un ambito di strategia di distribuzione, il suo comportamento viene modificato. In particolare, prima di applicare i gradienti su ciascuna istanza parallela durante l'allenamento sincrono, esegue una somma totale delle repliche dei gradienti.

Infine, una volta definito il passaggio di addestramento, è possibile eseguire l'iterazione su dist_dataset ed eseguire l'addestramento in un ciclo:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.18686396, shape=(), dtype=float32)
tf.Tensor(0.18628375, shape=(), dtype=float32)
tf.Tensor(0.18570684, shape=(), dtype=float32)
tf.Tensor(0.18513316, shape=(), dtype=float32)
tf.Tensor(0.1845627, shape=(), dtype=float32)
tf.Tensor(0.18399543, shape=(), dtype=float32)
tf.Tensor(0.18343134, shape=(), dtype=float32)
tf.Tensor(0.18287037, shape=(), dtype=float32)
tf.Tensor(0.18231256, shape=(), dtype=float32)
tf.Tensor(0.18175781, shape=(), dtype=float32)
tf.Tensor(0.18120615, shape=(), dtype=float32)
tf.Tensor(0.18065754, shape=(), dtype=float32)
tf.Tensor(0.18011193, shape=(), dtype=float32)
tf.Tensor(0.17956935, shape=(), dtype=float32)
tf.Tensor(0.17902976, shape=(), dtype=float32)
tf.Tensor(0.17849308, shape=(), dtype=float32)
tf.Tensor(0.17795937, shape=(), dtype=float32)
tf.Tensor(0.17742859, shape=(), dtype=float32)
tf.Tensor(0.17690066, shape=(), dtype=float32)
tf.Tensor(0.17637561, shape=(), dtype=float32)

Nell'esempio sopra, hai ripetuto il dist_dataset per fornire input alla tua formazione. Ti viene anche fornito il tf.distribute.Strategy.make_experimental_numpy_dataset per supportare gli input NumPy. Puoi utilizzare questa API per creare un set di dati prima di chiamare tf.distribute.Strategy.experimental_distribute_dataset .

Un altro modo per eseguire l'iterazione sui dati consiste nell'utilizzare esplicitamente gli iteratori. È possibile eseguire questa operazione quando si desidera eseguire un determinato numero di passaggi anziché eseguire l'iterazione sull'intero set di dati. L'iterazione precedente verrebbe ora modificata per creare prima un iteratore e quindi chiamarlo esplicitamente in next per ottenere i dati di input.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.17585339, shape=(), dtype=float32)
tf.Tensor(0.17533402, shape=(), dtype=float32)
tf.Tensor(0.17481743, shape=(), dtype=float32)
tf.Tensor(0.17430364, shape=(), dtype=float32)
tf.Tensor(0.17379259, shape=(), dtype=float32)
tf.Tensor(0.17328428, shape=(), dtype=float32)
tf.Tensor(0.17277871, shape=(), dtype=float32)
tf.Tensor(0.17227581, shape=(), dtype=float32)
tf.Tensor(0.17177561, shape=(), dtype=float32)
tf.Tensor(0.17127804, shape=(), dtype=float32)

Questo copre il caso più semplice di utilizzo dell'API tf.distribute.Strategy per distribuire cicli di formazione personalizzati.

Cosa è supportato ora?

API di formazione MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Ciclo di allenamento personalizzato Supportato Supportato Supportato Supporto sperimentale Supporto sperimentale

Esempi e tutorial

Di seguito sono riportati alcuni esempi di utilizzo di strategie di distribuzione con cicli di formazione personalizzati:

  1. Esercitazione : Formazione con un ciclo di formazione personalizzato e MirroredStrategy .
  2. Tutorial : Formazione con un ciclo di formazione personalizzato e MultiWorkerMirroredStrategy .
  3. Guida : contiene un esempio di un ciclo di formazione personalizzato con TPUStrategy .
  4. Esercitazione : training del server dei parametri con un ciclo di training personalizzato e ParameterServerStrategy .
  5. Repository TensorFlow Model Garden contenente raccolte di modelli all'avanguardia implementati utilizzando varie strategie.

Altri argomenti

Questa sezione tratta alcuni argomenti rilevanti per più casi d'uso.

Impostazione della variabile di ambiente TF_CONFIG

Per la formazione multi-lavoratore, come accennato in precedenza, è necessario impostare la variabile di ambiente 'TF_CONFIG' per ogni binario in esecuzione nel cluster. La variabile di ambiente 'TF_CONFIG' è una stringa JSON che specifica quali attività costituiscono un cluster, i loro indirizzi e il ruolo di ciascuna attività nel cluster. Il tensorflow/ecosystem fornisce un modello Kubernetes, che imposta 'TF_CONFIG' per le tue attività di formazione.

Ci sono due componenti di 'TF_CONFIG' : un cluster e un'attività.

  • Un cluster fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavoro come i lavoratori. Nella formazione per più lavoratori, di solito c'è un lavoratore che si assume un po' più di responsabilità come il salvataggio del checkpoint e la scrittura di un file di riepilogo per TensorBoard oltre a ciò che fa un normale lavoratore. Tale lavoratore è indicato come lavoratore "capo", ed è consuetudine che il lavoratore con indice 0 sia nominato come lavoratore capo (infatti è così che viene implementato tf.distribute.Strategy ).
  • Un'attività d'altra parte fornisce informazioni sull'attività corrente. Il primo cluster di componenti è lo stesso per tutti i lavoratori e la seconda attività del componente è diversa per ogni lavoratore e specifica il tipo e l'indice di quel lavoratore.

Un esempio di 'TF_CONFIG' è:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Questo 'TF_CONFIG' specifica che ci sono tre worker e due task "ps" nel "cluster" insieme ai loro host e porte. La parte "task" specifica il ruolo dell'attività corrente nel "cluster" —lavoratore 1 (il secondo lavoratore). I ruoli validi in un cluster sono "chief" , "worker" , "ps" e "evaluator" . Non dovrebbe esserci alcun lavoro "ps" tranne quando si usa tf.distribute.experimental.ParameterServerStrategy .

Qual è il prossimo?

tf.distribute.Strategy è attivamente in fase di sviluppo. Provalo e fornisci il tuo feedback utilizzando i problemi di GitHub .