Salva la data! Google I / O ritorna dal 18 al 20 maggio Registrati ora
Questa pagina è stata tradotta dall'API Cloud Translation.
Switch to English

Formazione multi-lavoratore con Keras

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza sorgente su GitHub Scarica taccuino

Panoramica

Questo tutorial mostra l'addestramento distribuito multi-worker con il modello Keras utilizzando tf.distribute.Strategy API tf.distribute.Strategy , in particolare tf.distribute.MultiWorkerMirroredStrategy . Con l'aiuto di questa strategia, un modello Keras progettato per essere eseguito su un singolo lavoratore può funzionare senza problemi su più lavoratori con una modifica minima del codice.

La guida Distributed Training in TensorFlow è disponibile per una panoramica delle strategie di distribuzione supportate da TensorFlow per coloro che sono interessati a una comprensione più approfondita delle API tf.distribute.Strategy .

Impostare

Innanzitutto, alcune importazioni necessarie.

import json
import os
import sys

Prima di importare TensorFlow, apporta alcune modifiche all'ambiente.

Disabilita tutte le GPU. Ciò previene gli errori causati dai lavoratori che tentano di utilizzare la stessa GPU. Per un'applicazione reale ogni lavoratore si troverebbe su una macchina diversa.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Reimposta la variabile d'ambiente TF_CONFIG , ne vedrai di più in seguito.

os.environ.pop('TF_CONFIG', None)

Assicurati che la directory corrente sia sul percorso di Python. Ciò consente al notebook di importare i file scritti da %%writefile secondo momento.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Ora importa TensorFlow.

import tensorflow as tf

Set di dati e definizione del modello

Quindi creare un file mnist.py con un modello semplice e una configurazione del set di dati. Questo file python verrà utilizzato dai processi di lavoro in questo tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Prova ad addestrare il modello per un numero limitato di epoche e osserva i risultati di un singolo lavoratore per assicurarti che tutto funzioni correttamente. Man mano che l'allenamento progredisce, la perdita dovrebbe diminuire e la precisione dovrebbe aumentare.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2959 - accuracy: 0.0977
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2311 - accuracy: 0.2726
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1668 - accuracy: 0.4236
<tensorflow.python.keras.callbacks.History at 0x7f62c6ec0780>

Configurazione multi-lavoratore

Entriamo ora nel mondo della formazione multi-lavoratore. In TensorFlow, la variabile di ambiente TF_CONFIG è richiesta per l'addestramento su più macchine, ognuna delle quali ha eventualmente un ruolo diverso. TF_CONFIG è una stringa JSON utilizzata per specificare la configurazione del cluster su ogni worker che fa parte del cluster.

Ecco una configurazione di esempio:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ecco lo stesso TF_CONFIG serializzato come stringa JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Ci sono due componenti di TF_CONFIG : cluster e task .

  • cluster è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavori come il worker . Nella formazione multi-lavoratore con MultiWorkerMirroredStrategy , di solito c'è un worker che si assume un po 'più di responsabilità come il salvataggio del checkpoint e la scrittura del file di riepilogo per TensorBoard oltre a ciò che fa un worker normale. Tale lavoratore è indicato come il lavoratore chief , ed è consuetudine che il worker con index 0 sia nominato come il worker principale (in effetti è così tf.distribute.Strategy viene implementato tf.distribute.Strategy ).

  • task fornisce informazioni sull'attività corrente ed è diverso per ogni lavoratore. Specifica il type e l' index di quel lavoratore.

In questo esempio, imposti il type attività su "worker" e l' index su 0 . Questa macchina è il primo operaio e sarà nominato capo operaio e farà più lavoro degli altri. Si noti che anche altre macchine dovranno avere la variabile d'ambiente TF_CONFIG impostata e dovrebbe avere lo stesso dict del cluster , ma un type attività o un index attività diversi a seconda dei ruoli di quelle macchine.

A scopo illustrativo, questo tutorial mostra come si può impostare un TF_CONFIG con 2 worker su localhost . In pratica, gli utenti creerebbero più worker su indirizzi IP / porte esterni e imposterebbero TF_CONFIG su ogni worker in modo appropriato.

In questo esempio utilizzerai 2 lavoratori, il TF_CONFIG del primo lavoratore è mostrato sopra. Per il secondo worker dovresti impostare tf_config['task']['index']=1

Sopra, tf_config è solo una variabile locale in python. Per usarlo effettivamente per configurare l'addestramento, questo dizionario deve essere serializzato come JSON e inserito nella variabile d'ambiente TF_CONFIG .

Variabili di ambiente e sottoprocessi nei notebook

I sottoprocessi ereditano le variabili di ambiente dal loro genitore. Quindi, se imposti una variabile di ambiente in questo processo del jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

È possibile accedere alla variabile di ambiente da un sottoprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Nella sezione successiva, lo userai per passare TF_CONFIG ai sottoprocessi di lavoro. Non avvieresti mai i tuoi lavori in questo modo, ma è sufficiente per gli scopi di questo tutorial: per dimostrare un esempio minimo di multi-worker.

Scegli la giusta strategia

In TensorFlow esistono due forme principali di formazione distribuita:

  • Formazione sincrona, in cui i passaggi della formazione sono sincronizzati tra i lavoratori e le repliche, e
  • Formazione asincrona, in cui le fasi di addestramento non sono strettamente sincronizzate.

MultiWorkerMirroredStrategy , che è la strategia consigliata per la formazione sincrona multi-lavoratore, sarà dimostrata in questa guida. Per addestrare il modello, utilizza un'istanza di tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crea copie di tutte le variabili nei livelli del modello su ogni dispositivo su tutti i worker. Utilizza CollectiveOps , un'opzione TensorFlow per la comunicazione collettiva, per aggregare i gradienti e mantenere sincronizzate le variabili. La guida tf.distribute.Strategy contiene maggiori dettagli su questa strategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fornisce più implementazioni tramite il parametro CommunicationOptions . RING implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione tra host. NCCL utilizza NCCL Nvidia per implementare i collettivi. AUTO rimanda la scelta al runtime. La scelta migliore per l'implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster.

Addestra il modello

Con l'integrazione dell'API tf.distribute.Strategy in tf.keras , l'unica modifica che apporterai per distribuire la formazione a più lavoratori è racchiudere la creazione del modello e la chiamata model.compile() all'interno di strategy.scope() . L'ambito della strategia di distribuzione determina come e dove vengono create le variabili e, nel caso di MultiWorkerMirroredStrategy , le variabili create sono MirroredVariable s e vengono replicate su ciascuno dei worker.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Per eseguire effettivamente MultiWorkerMirroredStrategy è necessario eseguire i processi di lavoro e passare loro un TF_CONFIG .

Come il file mnist.py scritto in precedenza, ecco il main.py che ciascuno dei worker eseguirà:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Nello snippet di codice sopra, si noti che global_batch_size , che viene passato a Dataset.batch , è impostato su per_worker_batch_size * num_workers . Ciò garantisce che ogni lavoratore per_worker_batch_size batch di esempi per_worker_batch_size indipendentemente dal numero di lavoratori.

La directory corrente ora contiene entrambi i file Python:

ls *.py
main.py
mnist.py

Quindi json serializza TF_CONFIG e aggiungilo alle variabili d'ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora puoi avviare un processo di lavoro che eseguirà main.py e utilizzare TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Ci sono alcune cose da notare sul comando precedente:

  1. Usa il %%bash che è una "magia" del notebook per eseguire alcuni comandi bash.
  2. Usa il flag --bg per eseguire il processo bash in background, perché questo worker non terminerà. Aspetta tutti i lavoratori prima di iniziare.

Il processo di lavoro in background non stamperà l'output su questo notebook, quindi &> reindirizza il suo output a un file, così puoi vedere cosa è successo.

Quindi, attendi qualche secondo affinché il processo si avvii:

import time
time.sleep(10)

Ora guarda cosa è stato emesso finora nel file di log del worker:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

L'ultima riga del file di registro dovrebbe indicare: Started server with target: grpc://localhost:12345 . Il primo lavoratore è ora pronto e attende che tutti gli altri lavoratori siano pronti a procedere.

Quindi aggiorna il tf_config per il processo del secondo lavoratore da raccogliere:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora avvia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire questo processo in background):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511
2021-02-23 02:20:43.794926: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:45.375845: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:45.376779: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:46.347650: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:46.347716: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347726: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347847: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:46.347887: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:46.347898: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:46.348715: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:46.349096: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.349700: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.353497: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:46.353936: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-02-23 02:20:47.285814: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.507974: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508360: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz

Ora se ricontrolli i log scritti dal primo lavoratore vedrai che ha partecipato alla formazione di quel modello:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-02-23 02:20:47.286117: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.508657: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508964: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

Non sorprende che questo sia stato più lento del test eseguito all'inizio di questo tutorial. L'esecuzione di più lavoratori su una singola macchina aggiunge solo un sovraccarico. L'obiettivo qui non era quello di migliorare il tempo di formazione, ma solo di fornire un esempio di formazione multi-lavoratore.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formazione multi-lavoratore in profondità

Finora questo tutorial ha dimostrato una configurazione multi-worker di base. Il resto di questo documento esamina in dettaglio altri fattori che possono essere utili o importanti per casi d'uso reali.

Partizionamento orizzontale del set di dati

Nella formazione multi-worker, è necessario lo sharding del set di dati per garantire convergenza e prestazioni.

L'esempio nella sezione precedente si basa sull'autosharding predefinito fornito dall'API tf.distribute.Strategy . È possibile controllare lo sharding impostando tf.data.experimental.AutoShardPolicy di tf.data.experimental.DistributeOptions . Per ulteriori informazioni sul partizionamento automatico, vedere la Guida all'input distribuito .

Ecco un rapido esempio di come disattivare l'auto sharding, in modo che ogni replica elabori ogni esempio (non consigliato):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Valutazione

Se si passa validation_data a model.fit , si alternerà l'addestramento e la valutazione per ogni epoca. La valutazione che prende validation_data viene distribuita sullo stesso insieme di lavoratori ei risultati della valutazione sono aggregati e disponibili per tutti i lavoratori. Analogamente all'addestramento, il set di dati di convalida viene suddiviso automaticamente a livello di file. È necessario impostare una dimensione batch globale nel set di dati di convalida e impostare validation_steps . Per la valutazione si consiglia anche un set di dati ripetuto.

In alternativa, puoi anche creare un'altra attività che legga periodicamente i checkpoint ed esegua la valutazione. Questo è ciò che fa Estimator. Ma questo non è un modo consigliato per eseguire la valutazione e quindi i suoi dettagli vengono omessi.

Prestazione

Ora hai un modello Keras tutto configurato per essere eseguito su più worker con MultiWorkerMirroredStrategy . Puoi provare le seguenti tecniche per modificare le prestazioni della formazione multi-lavoratore con MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy fornisce più implementazioni di comunicazione collettiva . RING implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione tra host. NCCL utilizza NCCL Nvidia per implementare i collettivi. AUTO rimanda la scelta al runtime. La scelta migliore per l'implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster. Per ignorare la scelta automatica, specificare communication_options parametro del MultiWorkerMirroredStrategy costruttore 's, ad esempio communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • tf.float le variabili a tf.float se possibile. Il modello ResNet ufficiale include un esempio di come questo può essere fatto.

Tolleranza ai guasti

Nell'addestramento sincrono, il cluster fallirebbe se uno dei worker si guasta e non esiste alcun meccanismo di ripristino in caso di errore. L'utilizzo di Keras con tf.distribute.Strategy offre il vantaggio della tolleranza ai guasti nei casi in cui i lavoratori muoiono o sono altrimenti instabili. Puoi farlo preservando lo stato di addestramento nel file system distribuito di tua scelta, in modo tale che al riavvio dell'istanza che in precedenza non era riuscita o era stata interrotta, lo stato di addestramento viene ripristinato.

Quando un lavoratore diventa non disponibile, altri lavoratori falliranno (possibilmente dopo un timeout). In questi casi, il lavoratore non disponibile deve essere riavviato, così come gli altri lavoratori che hanno fallito.

Richiamata ModelCheckpoint

ModelCheckpoint callback ModelCheckpoint non fornisce più la funzionalità di tolleranza agli errori, utilizzare invece il callback BackupAndRestore .

Il callback ModelCheckpoint può ancora essere utilizzato per salvare i checkpoint. Ma con questo, se l'addestramento è stato interrotto o terminato con successo, per continuare l'addestramento dal checkpoint, l'utente è responsabile di caricare manualmente il modello.

Facoltativamente, l'utente può scegliere di salvare e ripristinare modelli / pesi al di fuori del callback ModelCheckpoint .

Salvataggio e caricamento del modello

Per salvare il modello utilizzando model.save o tf.saved_model.save , la destinazione del salvataggio deve essere diversa per ogni lavoratore. Sui lavoratori non capi, dovrai salvare il modello in una directory temporanea e sul capo, dovrai salvare nella directory del modello fornita. Le directory temporanee sul lavoratore devono essere univoche per evitare errori derivanti da più lavoratori che tentano di scrivere nella stessa posizione. Il modello salvato in tutte le directory è identico e in genere solo il modello salvato dal capo dovrebbe essere referenziato per il ripristino o la pubblicazione. Dovresti avere una logica di pulizia che elimina le directory temporanee create dai lavoratori una volta completata la formazione.

Il motivo per cui devi risparmiare contemporaneamente sul capo e sui lavoratori è perché potresti aggregare variabili durante il checkpoint, il che richiede che sia il capo che i lavoratori partecipino al protocollo di comunicazione allreduce. D'altra parte, lasciare che il capo e i lavoratori salvano nella stessa directory del modello comporterà errori dovuti a contese.

Con MultiWorkerMirroredStrategy , il programma viene eseguito su ogni lavoratore e, per sapere se il lavoratore corrente è il capo, sfrutta l'oggetto resolver del cluster che ha attributi task_type e task_id . task_type ti dice qual è il lavoro corrente (es. 'worker') e task_id ti dice l'identificatore del lavoratore. Il lavoratore con ID 0 è designato come capo lavoratore.

Nello snippet di codice riportato di seguito, write_filepath fornisce il percorso del file da scrivere, che dipende write_filepath worker. Nel caso di chief (worker con id 0), scrive nel percorso del file originale; per altri, crea una directory temporanea (con id nel percorso della directory) in cui scrivere:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con questo, ora sei pronto per salvare:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Come descritto sopra, in seguito il modello dovrebbe essere caricato solo dal percorso di salvataggio del capo, quindi togliamo quelli temporanei che i lavoratori non capi hanno salvato:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ora, quando è il momento di caricare, usiamo la comoda API tf.keras.models.load_model e continuiamo con il lavoro ulteriore. Qui, supponi di utilizzare solo un singolo worker per caricare e continuare l'addestramento, nel qual caso non chiami tf.keras.models.load_model all'interno di un altro strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 13ms/step - loss: 2.3041 - accuracy: 7.8125e-04
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2873 - accuracy: 0.0023
<tensorflow.python.keras.callbacks.History at 0x7f62c4ef5048>

Salvataggio e ripristino del checkpoint

D'altra parte, il checkpoint consente di salvare i pesi del modello e ripristinarli senza dover salvare l'intero modello. Qui creerai un tf.train.Checkpoint che tiene traccia del modello, che è gestito da un tf.train.CheckpointManager modo che venga conservato solo l'ultimo checkpoint.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una volta configurato CheckpointManager , sei pronto per salvare e rimuovere i checkpoint non-chief worker salvati.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ora, quando è necessario eseguire il ripristino, è possibile trovare l'ultimo checkpoint salvato utilizzando la comoda funzione tf.train.latest_checkpoint . Dopo aver ripristinato il checkpoint, puoi continuare con l'addestramento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.3050 - accuracy: 0.0920
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2963 - accuracy: 0.0896
<tensorflow.python.keras.callbacks.History at 0x7f62c44a2710>

Richiamata BackupAndRestore

Il callback di BackupAndRestore fornisce funzionalità di tolleranza agli errori, eseguendo il backup del modello e del numero di epoca corrente in un file di checkpoint temporaneo sotto backup_dir argomento BackupAndRestore su BackupAndRestore . Questo viene fatto alla fine di ogni epoca.

Una volta che i lavori vengono interrotti e riavviati, il callback ripristina l'ultimo checkpoint e l'addestramento continua dall'inizio dell'epoca interrotta. Qualsiasi addestramento parziale già svolto nell'epoca incompiuta prima dell'interruzione verrà buttato via, in modo che non influisca sullo stato finale del modello.

Per utilizzarlo, fornire un'istanza di tf.keras.callbacks.experimental.BackupAndRestore alla chiamata tf.keras.Model.fit() .

Con MultiWorkerMirroredStrategy, se un lavoratore viene interrotto, l'intero cluster viene sospeso finché il lavoratore interrotto non viene riavviato. Anche gli altri worker verranno riavviati e il worker interrotto si ricongiungerà al cluster. Quindi, ogni worker legge il file del checkpoint precedentemente salvato e riprende il suo stato precedente, consentendo così al cluster di tornare sincronizzato. Quindi la formazione continua.

BackupAndRestore callback di BackupAndRestore utilizza CheckpointManager per salvare e ripristinare lo stato di addestramento, che genera un file chiamato checkpoint che tiene traccia dei checkpoint esistenti insieme a quello più recente. Per questo motivo, backup_dir non dovrebbe essere riutilizzato per memorizzare altri checkpoint al fine di evitare backup_dir nome.

Attualmente, la richiamata BackupAndRestore supporta un singolo lavoratore senza strategia, MirroredStrategy e multi-worker con MultiWorkerMirroredStrategy. Di seguito sono riportati due esempi sia per la formazione multi-lavoratore che per la formazione per singolo lavoratore.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2930 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2467 - accuracy: 0.2765
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1963 - accuracy: 0.3645
<tensorflow.python.keras.callbacks.History at 0x7f62c4371390>

Se controlli la directory di backup_dir hai specificato in BackupAndRestore , potresti notare alcuni file di checkpoint generati temporaneamente. Questi file sono necessari per ripristinare le istanze perse in precedenza e verranno rimossi dalla libreria alla fine di tf.keras.Model.fit() dopo l'uscita dalla formazione.

Guarda anche

  1. La guida Distributed Training in TensorFlow fornisce una panoramica delle strategie di distribuzione disponibili.
  2. Modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
  3. La sezione Prestazioni nella guida fornisce informazioni su altre strategie e strumenti che è possibile utilizzare per ottimizzare le prestazioni dei modelli TensorFlow.