Aiuto proteggere la Grande Barriera Corallina con tensorflow sul Kaggle Join Sfida

Formazione multilavoratore con Keras

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza la fonte su GitHub Scarica il taccuino

Panoramica

Questo tutorial mostra come eseguire multi-operaio di formazione distribuito con un modello Keras e la Model.fit API utilizzando l' tf.distribute.Strategy API-specifico la tf.distribute.MultiWorkerMirroredStrategy di classe. Con l'aiuto di questa strategia, un modello Keras progettato per essere eseguito su un singolo lavoratore può funzionare senza problemi su più lavoratori con modifiche minime al codice.

Per chi è interessato a una più profonda comprensione di tf.distribute.Strategy API, la formazione Distribuito in tensorflow guida è disponibile per una panoramica delle strategie di distribuzione tensorflow supporti.

Per informazioni su come usare il MultiWorkerMirroredStrategy con Keras e un ciclo di formazione personalizzato, fare riferimento al ciclo di formazione personalizzata con Keras e MultiWorkerMirroredStrategy .

Si noti che lo scopo di questo tutorial è dimostrare un esempio minimo di più lavoratori con due lavoratori.

Impostare

Inizia con alcune importazioni necessarie:

import json
import os
import sys

Prima di importare TensorFlow, apportare alcune modifiche all'ambiente:

  1. Disabilita tutte le GPU. Ciò impedisce errori causati da tutti i lavoratori che cercano di utilizzare la stessa GPU. In un'applicazione del mondo reale, ogni lavoratore sarebbe su una macchina diversa.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Ripristinare il TF_CONFIG variabile d'ambiente (imparerete di più su questo più tardi):
os.environ.pop('TF_CONFIG', None)
  1. Assicurarsi che la directory corrente è di Python path-questo permette al notebook di importare i file scritti da %%writefile seguito:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Ora importa TensorFlow:

import tensorflow as tf

Dataset e definizione del modello

Successivamente, creare un mnist.py file con un modello semplice e set di dati di configurazione. Questo file Python verrà utilizzato dai processi di lavoro in questo tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Formazione modello su un singolo lavoratore

Prova il training del modello per un piccolo numero di epoche e osservare i risultati di un singolo lavoratore per fare in modo corretto che tutto funzioni. Con il progredire dell'addestramento, la perdita dovrebbe diminuire e la precisione dovrebbe aumentare.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

Configurazione multi-operaio

Ora entriamo nel mondo della formazione multilavoratore.

Un cluster con lavori e compiti

In tensorflow, formazione distribuita comporta: un 'cluster' con diversi posti di lavoro, e ciascuno dei posti di lavoro può avere uno o più 'task' s.

Sarà necessario il TF_CONFIG variabile di ambiente di configurazione per la formazione su più macchine, ognuna delle quali forse ha un ruolo diverso. TF_CONFIG è una stringa JSON utilizzato per specificare la configurazione del cluster per ogni lavoratore che fa parte del cluster.

Ci sono due componenti di un TF_CONFIG variabile: 'cluster' e 'task' .

  • Un 'cluster' è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavori, come 'worker' o 'chief' .

    • Nella formazione multi-lavoratore con tf.distribute.MultiWorkerMirroredStrategy , di solito c'è un 'worker' che assume le responsabilità, come ad esempio il salvataggio di un posto di blocco e la scrittura di un file di riepilogo per TensorBoard, in aggiunta a quello che un normale 'worker' fa. Tale 'worker' è indicato come il capo operaio (con un nome di processo 'chief' ).
    • E 'consuetudine che il 'chief' di avere 'index' 0 nominati a (in realtà, questo è il modo tf.distribute.Strategy è implementato).
  • Un 'task' fornisce informazioni dell'attività corrente ed è diversa per ogni lavoratore. E specifica il 'type' e 'index' di tale lavoratore.

Di seguito una configurazione di esempio:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ecco la stessa TF_CONFIG serializzato come stringa JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Si noti che tf_config è solo una variabile locale in Python. Per essere in grado di usarlo per una configurazione di formazione, questo dict deve essere serializzato come JSON e collocato in un TF_CONFIG variabile di ambiente.

Nella configurazione di esempio di cui sopra, si imposta il compito 'type' a 'worker' e il compito 'index' per 0 . Pertanto, questa macchina è il primo operaio. Sarà nominato come il 'chief' dei lavoratori e fare più lavoro rispetto agli altri.

A scopo illustrativo, Questo tutorial mostra come si può impostare una TF_CONFIG variabile con due lavoratori su localhost .

In pratica, si potrebbe creare più lavoratori su IP indirizzi / porte esterne e impostare un TF_CONFIG variabile ogni lavoratore di conseguenza.

In questo tutorial utilizzerai due lavoratori:

  • Il primo ( 'chief' ) del lavoratore TF_CONFIG è mostrato sopra.
  • Per il secondo operaio, verrà impostata tf_config['task']['index']=1

Variabili d'ambiente e sottoprocessi nei notebook

I sottoprocessi ereditano le variabili di ambiente dal genitore.

Ad esempio, puoi impostare una variabile di ambiente in questo processo Jupyter Notebook come segue:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Quindi, puoi accedere alla variabile di ambiente da un sottoprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Nella sezione successiva, che verrà utilizzato un metodo simile per passare il TF_CONFIG ai sottoprocessi lavoratori. In uno scenario reale, non avvieresti i tuoi lavori in questo modo, ma in questo esempio è sufficiente.

Scegli la giusta strategia

In TensorFlow, ci sono due forme principali di formazione distribuita:

  • Formazione sincrona, in cui vengono sincronizzate le fasi di formazione attraverso i lavoratori e le repliche, e
  • La formazione asincrona, in cui non sono strettamente sincronizzate le fasi di formazione (ad esempio, la formazione di server parametro ).

Questo tutorial mostra come eseguire sincrono di formazione multi-lavoratore utilizzando un'istanza di tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crea copie di tutte le variabili in strati del modello su ogni dispositivo su tutti i lavoratori. Esso utilizza CollectiveOps , un op tensorflow per la comunicazione collettiva, a gradienti di aggregare e mantenere le variabili in sincronia. La tf.distribute.Strategy guida ha più dettagli su questa strategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fornisce implementazioni multiple attraverso il CommunicationOptions parametro: 1) RING attrezzi collettivi anello a base utilizzando gRPC come strato di comunicazione cross-ospite; 2) NCCL utilizza la Libreria Collettiva comunicazione NVIDIA per implementare collettivi; e 3) AUTO rinvia la scelta per il runtime. La scelta migliore per l'implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster.

Allena il modello

Con l'integrazione di tf.distribute.Strategy API in tf.keras , l'unico cambiamento si farà distribuire la formazione a più di lavoro si racchiude la costruzione del modello e model.compile() chiamata all'interno strategy.scope() . Dettami campo di applicazione della strategia di distribuzione come e dove vengono create le variabili, e nel caso di MultiWorkerMirroredStrategy , le variabili create sono MirroredVariable s, e sono replicati su ciascuno dei lavoratori.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Per effettivamente eseguito con MultiWorkerMirroredStrategy sarà necessario eseguire i processi di lavoro e passare un TF_CONFIG a loro.

Come il mnist.py file scritto in precedenza, qui è la main.py che ciascuno dei lavoratori verrà eseguito:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Nel frammento di codice precedente nota che la global_batch_size , che viene passato a Dataset.batch , è impostato su per_worker_batch_size * num_workers . Questo assicura che ciascun lavoratore elabora lotti di per_worker_batch_size esempi indipendentemente dal numero di lavoratori.

La directory corrente ora contiene entrambi i file Python:

ls *.py
main.py
mnist.py

Così JSON-serializzare l' TF_CONFIG e aggiungerlo alle variabili d'ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora, è possibile avviare un processo di lavoro che verrà eseguito il main.py e utilizzare il TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Ci sono alcune cose da notare sul comando precedente:

  1. Esso utilizza la %%bash , che è un notebook "magico" per eseguire alcuni comandi bash.
  2. Esso utilizza il --bg bandiera per eseguire il bash processo in background, perché questo lavoratore non terminerà. Aspetta tutti i lavoratori prima di iniziare.

Il processo di lavoro in background non stamperà in uscita a questo notebook, in modo che il &> redirige l'output ad un file in modo che è possibile controllare ciò che è accaduto in un file di log dopo.

Quindi, attendi qualche secondo per l'avvio del processo:

import time
time.sleep(10)

Ora, controlla cosa è stato emesso finora nel file di registro del lavoratore:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

L'ultima riga del file di registro dovrebbe dire: Started server with target: grpc://localhost:12345 . Il primo lavoratore è ora pronto e sta aspettando che tutti gli altri lavoratori siano pronti per procedere.

Quindi aggiornare il tf_config per il processo del secondo lavoratore di prendere:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Avvia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire questo processo in background):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

Se ricontrolla i log scritti dal primo lavoratore, scoprirai che ha partecipato alla formazione di quel modello:

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

Non sorprende, questo corse più lento rispetto al test eseguito all'inizio di questo tutorial.

L'esecuzione di più lavoratori su una singola macchina aggiunge solo un sovraccarico.

L'obiettivo qui non era quello di migliorare il tempo di formazione, ma solo di dare un esempio di formazione multi-lavoratore.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formazione approfondita multi-lavoratore

Finora, hai imparato come eseguire una configurazione di base per più lavoratori.

Durante il resto del tutorial, imparerai in dettaglio altri fattori, che possono essere utili o importanti per casi d'uso reali.

Condivisione del set di dati

Nella formazione multi-lavoratore, è necessaria dataset sharding per garantire la convergenza e le prestazioni.

Nell'esempio della sezione precedente si basa sulla autosharding predefinito fornito dal tf.distribute.Strategy API. È possibile controllare lo sharding impostando il tf.data.experimental.AutoShardPolicy dei tf.data.experimental.DistributeOptions .

Per ulteriori informazioni su auto-sharding, fare riferimento alla guida di ingresso distribuita .

Ecco un rapido esempio di come trasformare l'auto sharding fuori, in modo che ogni replica elabora tutti gli esempi (non raccomandato):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Valutazione

Se si passa il validation_data in Model.fit , si alternerà tra la formazione e valutazione per ogni epoca. La valutazione prendendo il validation_data è distribuito in tutto lo stesso insieme dei lavoratori e dei risultati della valutazione sono aggregati e disponibili per tutti i lavoratori.

Analogamente all'addestramento, il set di dati di convalida viene suddiviso automaticamente a livello di file. È necessario impostare una dimensione del lotto globale nel set di dati di validazione e impostare le validation_steps .

Si raccomanda anche un set di dati ripetuto per la valutazione.

In alternativa, puoi anche creare un'altra attività che legge periodicamente i checkpoint ed esegue la valutazione. Questo è ciò che fa lo stimatore. Ma questo non è un modo consigliato per eseguire la valutazione e quindi i suoi dettagli vengono omessi.

Prestazione

Si dispone ora di un modello Keras che è tutto pronto per l'esecuzione in più lavoratori con il MultiWorkerMirroredStrategy .

Per ottimizzare le prestazioni della formazione multi-lavoratore, puoi provare quanto segue:

  • tf.distribute.MultiWorkerMirroredStrategy fornisce molteplici implementazioni di comunicazione collettiva :

    • RING collettivi basati anello implementa utilizzando gRPC come strato di comunicazione cross-ospitante.
    • NCCL utilizza il Collettivo Biblioteca comunicazione NVIDIA per implementare collettivi.
    • AUTO rinvia la scelta per il runtime.

    La scelta migliore per l'implementazione collettiva dipende dal numero di GPU, dal tipo di GPU e dall'interconnessione di rete nel cluster. Per ignorare la scelta automatica, specificare il communication_options parametro MultiWorkerMirroredStrategy costruttore 's. Per esempio:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Fusioni le variabili tf.float , se possibile:

    • Il modello ufficiale RESNET include un esempio di come questo può essere fatto.

Tolleranza ai guasti

Nell'addestramento sincrono, il cluster fallirebbe se uno dei lavoratori si guasta e non esiste alcun meccanismo di ripristino degli errori.

Utilizzando Keras con tf.distribute.Strategy viene fornito con il vantaggio di tolleranza ai guasti nei casi in cui muoiono i lavoratori o altrimenti instabili. Puoi farlo preservando lo stato di addestramento nel file system distribuito di tua scelta, in modo tale che al riavvio dell'istanza che in precedenza ha avuto esito negativo o interrotto, lo stato di addestramento viene ripristinato.

Quando un lavoratore diventa non disponibile, gli altri lavoratori falliranno (possibilmente dopo un timeout). In tali casi, il lavoratore non disponibile deve essere riavviato, così come gli altri lavoratori che hanno fallito.

Richiamata ModelCheckpoint

ModelCheckpoint callback non fornisce più funzionalità di tolleranza di errore, si prega di utilizzare BackupAndRestore invece richiamata.

Il ModelCheckpoint richiamata può ancora essere utilizzato per salvare posti di blocco. Ma con questo, se l'addestramento è stato interrotto o terminato con successo, per continuare l'addestramento dal punto di controllo, l'utente è responsabile di caricare manualmente il modello.

Opzionalmente l'utente può scegliere di salvare e ripristinare modello / pesi fuori ModelCheckpoint callback.

Salvataggio e caricamento del modello

Per salvare il modello utilizzando model.save o tf.saved_model.save , le esigenze destinazione di salvataggio per essere diverso per ogni lavoratore.

  • Per i lavoratori non capo, sarà necessario salvare il modello in una directory temporanea.
  • Per il capo, dovrai salvare nella directory del modello fornita.

Le directory temporanee sul lavoratore devono essere univoche per evitare errori risultanti da più lavoratori che tentano di scrivere nella stessa posizione.

Il modello salvato in tutte le directory è identico e in genere si dovrebbe fare riferimento solo al modello salvato dal capo per il ripristino o il servizio.

Dovresti avere una logica di pulizia che elimini le directory temporanee create dai lavoratori una volta completata la formazione.

Il motivo per risparmiare contemporaneamente sul capo e sui lavoratori è perché potresti aggregare le variabili durante il checkpoint, il che richiede che sia il capo che i lavoratori partecipino al protocollo di comunicazione allreduce. D'altro canto, consentire a capo e lavoratori di salvare nella stessa directory del modello si tradurrà in errori a causa della contesa.

Utilizzando il MultiWorkerMirroredStrategy , il programma viene eseguito su ogni lavoratore, e al fine di sapere se il lavoratore corrente è il capo, sfrutta l'oggetto resolver cluster che ha gli attributi task_type e task_id :

  • task_type ti dice quello che il lavoro corrente (ad esempio 'worker' ).
  • task_id ti dice l'identificativo del lavoratore.
  • Il lavoratore con task_id == 0 è designato come il capo operaio.

Nel frammento di codice seguente, la write_filepath funzione fornisce il percorso del file di scrittura, che dipende dalla del lavoratore task_id :

  • Per il capo operaio (con task_id == 0 ), scrive per il percorso del file originale.
  • Per gli altri lavoratori, si crea una DIRECTORY temporanea temp_dir -con il task_id nel percorso di directory di scrivere in:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con ciò, ora sei pronto per salvare:

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Come descritto sopra, in seguito il modello dovrebbe essere caricato solo dal percorso in cui è stato salvato il capo, quindi rimuoviamo quelli temporanei salvati dai non capi:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ora, quando è il momento di carico, usiamo conveniente tf.keras.models.load_model API, e proseguire con ulteriori lavori.

Qui, assumere solo con singolo lavoratore di carico e di continuare la formazione, nel qual caso non si chiama tf.keras.models.load_model all'interno di un altro strategy.scope() (nota che strategy = tf.distribute.MultiWorkerMirroredStrategy() , come definito in precedenza ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

Salvataggio e ripristino del checkpoint

D'altra parte, il checkpoint ti consente di salvare i pesi del tuo modello e ripristinarli senza dover salvare l'intero modello.

Qui, si creerà uno tf.train.Checkpoint che traccia il modello, che è gestito dalla tf.train.CheckpointManager , in modo che solo l'ultimo checkpoint è conservato:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una volta che il CheckpointManager è impostato, ora siete pronti a salvare e rimuovere i posti di blocco i lavoratori non era stata risparmiata principali:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ora, quando è necessario ripristinare il modello, è possibile trovare le ultime checkpoint salvato utilizzando la comoda tf.train.latest_checkpoint funzioni. Dopo aver ripristinato il checkpoint, puoi continuare con l'addestramento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

Backup e ripristino di richiamata

Il tf.keras.callbacks.experimental.BackupAndRestore callback fornisce la funzionalità tolleranza ai guasti eseguendo il backup del modello e il numero epoca corrente in un file di checkpoint temporanea ai sensi backup_dir argomento BackupAndRestore . Questo viene fatto alla fine di ogni epoca.

Una volta che i lavori vengono interrotti e riavviati, la richiamata ripristina l'ultimo checkpoint e l'addestramento continua dall'inizio dell'epoca interrotta. Qualsiasi addestramento parziale già fatto nell'epoca incompiuta prima dell'interruzione verrà buttato via, in modo che non influisca sullo stato finale del modello.

Per usarlo, fornire un esempio di tf.keras.callbacks.experimental.BackupAndRestore al Model.fit chiamata.

Con MultiWorkerMirroredStrategy , se un lavoratore viene interrotto, l'intero cluster interrompe fino al riavvio del lavoratore interrotto. Verranno riavviati anche altri lavoratori e il lavoratore interrotto si ricongiunge al cluster. Quindi, ogni lavoratore legge il file del checkpoint che è stato precedentemente salvato e riprende il suo stato precedente, consentendo così al cluster di tornare sincronizzato. Poi, la formazione continua.

Il BackupAndRestore richiamata utilizza il CheckpointManager per salvare e ripristinare lo stato di formazione, che genera un file chiamato checkpoint che le tracce posti di blocco esistente con quello più recente. Per questo motivo, backup_dir non dovrebbe essere ri-utilizzato per memorizzare altri posti di blocco al fine di evitare la collisione nome.

Attualmente, il BackupAndRestore callback supporta singolo lavoratore con nessuna strategia, MirroredStrategy, e multi-lavoratore con MultiWorkerMirroredStrategy. Di seguito sono riportati due esempi sia per la formazione multi-lavoratore che per la formazione per singolo lavoratore.

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

Se si controlla la directory di backup_dir specificata nel BackupAndRestore , è possibile notare alcuni file del checkpoint generati temporaneamente. Questi file sono necessari per recuperare le istanze precedentemente perso, e saranno rimossi dalla libreria alla fine del Model.fit All'uscita successo della vostra formazione.

Risorse addizionali

  1. La formazione Distribuito in tensorflow guida fornisce una panoramica delle strategie di distribuzione disponibili.
  2. Il ciclo di formazione personalizzata con Keras e MultiWorkerMirroredStrategy spettacoli tutorial su come utilizzare il MultiWorkerMirroredStrategy con Keras e un ciclo di formazione personalizzato.
  3. Scopri i modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
  4. La prestazione migliore con tf.function guida fornisce informazioni su altre strategie e strumenti, come ad esempio il tensorflow Profiler è possibile utilizzare per ottimizzare le prestazioni dei modelli tensorflow.