Questa pagina è stata tradotta dall'API Cloud Translation.
Switch to English

Formazione multi-lavoratore con Keras

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza sorgente su GitHub Scarica notebook

Panoramica

Questo tutorial mostra l'addestramento distribuito multi-worker con il modello Keras utilizzando tf.distribute.Strategy API tf.distribute.Strategy , in particolare tf.distribute.experimental.MultiWorkerMirroredStrategy . Con l'aiuto di questa strategia, un modello Keras progettato per essere eseguito su un singolo lavoratore può funzionare senza problemi su più lavoratori con una modifica minima del codice.

La guida Distributed Training in TensorFlow è disponibile per una panoramica delle strategie di distribuzione supportate da TensorFlow per coloro che sono interessati a una comprensione più approfondita delle API tf.distribute.Strategy .

Impostare

Innanzitutto, alcune importazioni necessarie.

import json
import os
import sys

Prima di importare TensorFlow, apporta alcune modifiche all'ambiente.

Disabilita tutte le GPU. Ciò impedisce errori causati dai lavoratori che tentano tutti di utilizzare la stessa GPU. Per un'applicazione reale ogni lavoratore si troverebbe su una macchina diversa.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Reimposta la variabile d'ambiente TF_CONFIG , ne vedrai di più in seguito.

os.environ.pop('TF_CONFIG', None)

Assicurati che la directory corrente sia sul percorso di Python. Ciò consente al notebook di importare i file scritti da %%writefile secondo momento.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Ora importa TensorFlow.

import tensorflow as tf

Set di dati e definizione del modello

Quindi crea un file mnist.py con un modello semplice e una configurazione del set di dati. Questo file python verrà utilizzato dai processi di lavoro in questo tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Prova ad addestrare il modello per un numero limitato di epoche e osserva i risultati di un singolo lavoratore per assicurarti che tutto funzioni correttamente. Man mano che l'allenamento progredisce, la perdita dovrebbe diminuire e la precisione dovrebbe aumentare.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 1s 14ms/step - loss: 2.2700 - accuracy: 0.2304
Epoch 2/3
70/70 [==============================] - 1s 14ms/step - loss: 2.1825 - accuracy: 0.4569
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.0803 - accuracy: 0.5958

<tensorflow.python.keras.callbacks.History at 0x7fde81f99160>

Configurazione multi-lavoratore

Entriamo ora nel mondo della formazione multi-lavoratore. In TensorFlow, la variabile di ambiente TF_CONFIG è richiesta per l'addestramento su più macchine, ognuna delle quali ha eventualmente un ruolo diverso. TF_CONFIG è una stringa JSON utilizzata per specificare la configurazione del cluster su ogni worker che fa parte del cluster.

Ecco una configurazione di esempio:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ecco lo stesso TF_CONFIG serializzato come stringa JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Ci sono due componenti di TF_CONFIG : cluster e task .

  • cluster è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavori come il worker . Nella formazione multi-lavoratore con MultiWorkerMirroredStrategy , di solito c'è un worker che si assume un po 'più di responsabilità come salvare il checkpoint e scrivere un file di riepilogo per TensorBoard oltre a ciò che fa un worker normale. Tale lavoratore è indicato come il lavoratore chief , ed è consuetudine che il worker con index 0 sia nominato come worker principale (in effetti è così tf.distribute.Strategy viene implementato tf.distribute.Strategy ).

  • task fornisce informazioni sull'attività corrente ed è diverso per ogni lavoratore. Specifica il type e l' index di quel lavoratore.

In questo esempio, imposti il type attività su "worker" e l' index su 0 . Questa macchina è il primo operaio e sarà nominato capo operaio e farà più lavoro degli altri. Notare che anche altre macchine dovranno avere la variabile d'ambiente TF_CONFIG impostata, e dovrebbe avere lo stesso dict del cluster , ma diverso type attività o index attività a seconda dei ruoli di quelle macchine.

A scopo illustrativo, questo tutorial mostra come si può impostare un TF_CONFIG con 2 worker su localhost . In pratica, gli utenti creerebbero più worker su indirizzi IP / porte esterni e imposterebbero TF_CONFIG su ciascun worker in modo appropriato.

In questo esempio utilizzerai 2 lavoratori, TF_CONFIG del primo lavoratore è mostrato sopra. Per il secondo worker dovresti impostare tf_config['task']['index']=1

Sopra, tf_config è solo una variabile locale in python. Per usarlo effettivamente per configurare l'addestramento, questo dizionario deve essere serializzato come JSON e inserito nella variabile d'ambiente TF_CONFIG .

Variabili di ambiente e sottoprocessi nei notebook

I sottoprocessi ereditano le variabili di ambiente dal loro genitore. Quindi, se imposti una variabile di ambiente in questo processo del jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

È possibile accedere alla variabile di ambiente da un sottoprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Nella sezione successiva, lo userai per passare TF_CONFIG ai sottoprocessi di lavoro. Non avvieresti mai i tuoi lavori in questo modo, ma è sufficiente per gli scopi di questo tutorial: per dimostrare un esempio minimo di multi-worker.

Scegli la strategia giusta

In TensorFlow esistono due forme principali di formazione distribuita:

  • Formazione sincrona, in cui i passaggi della formazione sono sincronizzati tra i lavoratori e le repliche, e
  • Formazione asincrona, in cui le fasi di addestramento non sono strettamente sincronizzate.

MultiWorkerMirroredStrategy , che è la strategia consigliata per la formazione sincrona multi-lavoratore, verrà dimostrata in questa guida. Per addestrare il modello, utilizza un'istanza di tf.distribute.experimental.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crea copie di tutte le variabili nei livelli del modello su ogni dispositivo su tutti i worker. Utilizza CollectiveOps , un'opzione TensorFlow per la comunicazione collettiva, per aggregare i gradienti e mantenere sincronizzate le variabili. La guida tf.distribute.Strategy contiene maggiori dettagli su questa strategia.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy fornisce più implementazioni tramite il parametro CollectiveCommunication . RING implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione tra host. NCCL utilizza NCCL Nvidia per implementare i collettivi. AUTO rimanda la scelta al runtime. La scelta migliore per l'implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster.

Addestra il modello

Con l'integrazione dell'API tf.distribute.Strategy in tf.keras , l'unica modifica che apporterai per distribuire la formazione a più lavoratori è racchiudere la creazione del modello e la chiamata model.compile() all'interno di strategy.scope() . L'ambito della strategia di distribuzione determina come e dove vengono create le variabili e, nel caso di MultiWorkerMirroredStrategy , le variabili create sono MirroredVariable s e vengono replicate su ciascuno dei worker.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Per funzionare effettivamente con MultiWorkerMirroredStrategy dovrai eseguire i processi di lavoro e passare loro un TF_CONFIG .

Come il file mnist.py scritto in precedenza, ecco il main.py che ciascuno dei worker eseguirà:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Nello snippet di codice precedente si noti che global_batch_size , che viene passato a Dataset.batch , è impostato su per_worker_batch_size * num_workers . Ciò garantisce che ogni lavoratore per_worker_batch_size batch di esempi per_worker_batch_size indipendentemente dal numero di lavoratori.

La directory corrente ora contiene entrambi i file Python:

ls *.py
main.py
mnist.py

Quindi json serializza TF_CONFIG e aggiungilo alle variabili d'ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora puoi avviare un processo di lavoro che eseguirà main.py e utilizzare TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.

python main.py &> job_0.log

Ci sono alcune cose da notare sul comando precedente:

  1. Usa il %%bash che è una "magia" del notebook per eseguire alcuni comandi bash.
  2. Usa il flag --bg per eseguire il processo bash in background, perché questo worker non terminerà. Aspetta tutti i lavoratori prima di iniziare.

Il processo di lavoro in background non stamperà l'output su questo notebook, quindi &> reindirizza il suo output a un file, così puoi vedere cosa è successo.

Quindi, attendi qualche secondo affinché il processo si avvii:

import time
time.sleep(10)

Ora guarda cosa è stato emesso finora nel file di log del worker:

cat job_0.log
2020-09-12 01:27:14.389683: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-09-12 01:27:15.736635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2020-09-12 01:27:16.990546: E tensorflow/stream_executor/cuda/cuda_driver.cc:314] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2020-09-12 01:27:16.990628: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1064447366
2020-09-12 01:27:16.990639: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1064447366
2020-09-12 01:27:16.990757: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2020-09-12 01:27:16.990801: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2020-09-12 01:27:16.990811: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2020-09-12 01:27:16.991256: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN)to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2020-09-12 01:27:17.002048: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2000179999 Hz
2020-09-12 01:27:17.002560: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x44b82a0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-09-12 01:27:17.002598: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-09-12 01:27:17.009677: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2020-09-12 01:27:17.010257: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:405] Started server with target: grpc://localhost:12345

L'ultima riga del file di registro dovrebbe indicare: Started server with target: grpc://localhost:12345 . Il primo lavoratore è ora pronto e attende che tutti gli altri lavoratori siano pronti a procedere.

Quindi aggiorna il tf_config per il processo del secondo lavoratore da raccogliere:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora avvia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire questo processo in background):

python main.py
Epoch 1/3
70/70 [==============================] - 4s 55ms/step - loss: 2.2622 - accuracy: 0.1663
Epoch 2/3
70/70 [==============================] - 4s 53ms/step - loss: 2.1959 - accuracy: 0.2958
Epoch 3/3
70/70 [==============================] - 4s 55ms/step - loss: 2.1158 - accuracy: 0.4607

2020-09-12 01:27:24.523408: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-09-12 01:27:25.851591: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2020-09-12 01:27:26.994525: E tensorflow/stream_executor/cuda/cuda_driver.cc:314] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2020-09-12 01:27:26.994608: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1064447366
2020-09-12 01:27:26.994619: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1064447366
2020-09-12 01:27:26.994733: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2020-09-12 01:27:26.994779: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2020-09-12 01:27:26.994793: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2020-09-12 01:27:26.995232: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN)to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2020-09-12 01:27:27.003492: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2000179999 Hz
2020-09-12 01:27:27.003991: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x5b7e150 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-09-12 01:27:27.004027: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-09-12 01:27:27.010851: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2020-09-12 01:27:27.011365: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:405] Started server with target: grpc://localhost:23456
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
2020-09-12 01:27:27.936589: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:521] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Ora se ricontrolli i log scritti dal primo lavoratore vedrai che ha partecipato alla formazione di quel modello:

cat job_0.log
2020-09-12 01:27:14.389683: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-09-12 01:27:15.736635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2020-09-12 01:27:16.990546: E tensorflow/stream_executor/cuda/cuda_driver.cc:314] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2020-09-12 01:27:16.990628: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1064447366
2020-09-12 01:27:16.990639: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1064447366
2020-09-12 01:27:16.990757: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2020-09-12 01:27:16.990801: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2020-09-12 01:27:16.990811: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2020-09-12 01:27:16.991256: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN)to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2020-09-12 01:27:17.002048: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2000179999 Hz
2020-09-12 01:27:17.002560: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x44b82a0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-09-12 01:27:17.002598: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-09-12 01:27:17.009677: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2020-09-12 01:27:17.010257: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:405] Started server with target: grpc://localhost:12345
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
2020-09-12 01:27:27.934554: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:521] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
Epoch 1/3
70/70 [==============================] - 4s 55ms/step - loss: 2.2622 - accuracy: 0.1663
Epoch 2/3
70/70 [==============================] - 4s 53ms/step - loss: 2.1959 - accuracy: 0.2958
Epoch 3/3
70/70 [==============================] - 4s 55ms/step - loss: 2.1158 - accuracy: 0.4607

Non sorprende che questo sia stato più lento del test eseguito all'inizio di questo tutorial. L'esecuzione di più lavoratori su una singola macchina aggiunge solo un sovraccarico. L'obiettivo qui non era migliorare il tempo di formazione, ma solo fornire un esempio di formazione multi-lavoratore.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formazione multi-lavoratore in profondità

Finora questo tutorial ha dimostrato una configurazione multi-worker di base. Il resto di questo documento esamina in dettaglio altri fattori che possono essere utili o importanti per casi d'uso reali.

Partizionamento del set di dati

Nella formazione multi-worker, è necessario lo sharding del set di dati per garantire convergenza e prestazioni.

L'esempio nella sezione precedente si basa sull'autosharding predefinito fornito dall'API tf.distribute.Strategy . È possibile controllare lo sharding impostando tf.data.experimental.AutoShardPolicy di tf.data.experimental.DistributeOptions . Per ulteriori informazioni sul partizionamento automatico, vedere la Guida all'input distribuito .

Ecco un rapido esempio di come disattivare l'auto sharding, in modo che ogni replica elabori ogni esempio (non consigliato):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Valutazione

Se si passa validation_data a model.fit , si alternerà l'addestramento e la valutazione per ogni epoca. La valutazione che prende validation_data viene distribuita sullo stesso insieme di lavoratori ei risultati della valutazione vengono aggregati e disponibili per tutti i lavoratori. Analogamente all'addestramento, il set di dati di convalida viene suddiviso automaticamente a livello di file. È necessario impostare una dimensione batch globale nel set di dati di convalida e impostare validation_steps . Si consiglia inoltre un set di dati ripetuto per la valutazione.

In alternativa, puoi anche creare un'altra attività che legga periodicamente i checkpoint ed esegua la valutazione. Questo è ciò che fa Estimator. Ma questo non è un modo consigliato per eseguire la valutazione e quindi i suoi dettagli vengono omessi.

Predizione

Attualmente model.predict non funziona con MultiWorkerMirroredStrategy.

Prestazione

Ora hai un modello Keras che è tutto impostato per essere eseguito su più worker con MultiWorkerMirroredStrategy . Puoi provare le seguenti tecniche per modificare le prestazioni della formazione multi-lavoratore con MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy fornisce molteplici implementazioni di comunicazione collettiva . RING implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione tra host. NCCL utilizza NCCL Nvidia per implementare i collettivi. AUTO rimanda la scelta al runtime. La scelta migliore per l'implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster. Per ignorare la scelta automatica, specificare un valore valido per la communication dei parametri di MultiWorkerMirroredStrategy costruttore s', ad esempio, communication=tf.distribute.experimental.CollectiveCommunication.NCCL .
  • tf.float le variabili a tf.float se possibile. Il modello ResNet ufficiale include un esempio di come questo può essere fatto.

Tolleranza ai guasti

Nella formazione sincrona, il cluster fallirebbe se uno dei worker si guasta e non esiste alcun meccanismo di ripristino in caso di errore. L'utilizzo di Keras con tf.distribute.Strategy offre il vantaggio della tolleranza ai guasti nei casi in cui i lavoratori muoiono o sono altrimenti instabili. Puoi farlo preservando lo stato di addestramento nel file system distribuito di tua scelta, in modo tale che al riavvio dell'istanza che in precedenza non era riuscita o che era stata interrotta, lo stato di addestramento viene ripristinato.

Poiché tutti i lavoratori sono sincronizzati in termini di epoche e passaggi di formazione, gli altri lavoratori dovrebbero attendere il riavvio del lavoratore fallito o anticipato per continuare.

Richiamata ModelCheckpoint

ModelCheckpoint callback ModelCheckpoint non fornisce più la funzionalità di tolleranza agli errori, utilizza invece il callback BackupAndRestore .

Il callback ModelCheckpoint può ancora essere utilizzato per salvare i checkpoint. Ma con questo, se l'addestramento è stato interrotto o terminato con successo, per continuare l'addestramento dal checkpoint, l'utente è responsabile di caricare il modello manualmente.

Facoltativamente, l'utente può scegliere di salvare e ripristinare il modello / i pesi al di fuori del callback ModelCheckpoint .

Salvataggio e caricamento del modello

Per salvare il modello utilizzando model.save o tf.saved_model.save , la destinazione del salvataggio deve essere diversa per ogni lavoratore. Sui lavoratori non capi, dovrai salvare il modello in una directory temporanea e sul capo, dovrai salvare nella directory del modello fornita. Le directory temporanee sul lavoratore devono essere univoche per evitare errori derivanti da più lavoratori che tentano di scrivere nella stessa posizione. Il modello salvato in tutte le directory è identico e in genere solo il modello salvato dal capo dovrebbe essere referenziato per il ripristino o la pubblicazione. È necessario disporre di una logica di pulizia che elimini le directory temporanee create dai lavoratori una volta completata la formazione.

Il motivo per cui devi risparmiare contemporaneamente sul capo e sui lavoratori è perché potresti aggregare variabili durante il checkpoint, il che richiede che sia il capo che i lavoratori partecipino al protocollo di comunicazione allreduce. D'altra parte, lasciare che il capo e gli operai salvino nella stessa directory del modello comporterà errori dovuti a contese.

Con MultiWorkerMirroredStrategy , il programma viene eseguito su ogni lavoratore e, per sapere se il lavoratore corrente è il capo, si avvale dell'oggetto resolver del cluster che ha attributi task_type e task_id . task_type ti dice qual è il lavoro corrente (es. 'worker'), e task_id ti dice l'identificativo del worker. Il lavoratore con ID 0 è designato come capo lavoratore.

Nello snippet di codice riportato di seguito, write_filepath fornisce il percorso del file da scrivere, che dipende write_filepath worker. Nel caso di chief (worker con id 0), scrive nel percorso del file originale; per altri, crea una directory temporanea (con id nel percorso della directory) in cui scrivere:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con questo, ora sei pronto per salvare:

multi_worker_model.save(write_model_path)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.

INFO:tensorflow:Assets written to: /tmp/keras-model/assets

INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Come descritto sopra, in seguito il modello dovrebbe essere caricato solo dal percorso di salvataggio del capo, quindi rimuoviamo quelli temporanei salvati dai lavoratori non capi:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ora, quando è il momento di caricare, usiamo la comoda API tf.keras.models.load_model e continuiamo con il lavoro. Qui, supponi di utilizzare solo un singolo worker per caricare e continuare l'addestramento, nel qual caso non chiami tf.keras.models.load_model in un altro strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2929 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2736 - accuracy: 0.0016

<tensorflow.python.keras.callbacks.History at 0x7fded172d1d0>

Salvataggio e ripristino del checkpoint

D'altra parte, il checkpoint consente di salvare i pesi del modello e ripristinarli senza dover salvare l'intero modello. Qui creerai un tf.train.Checkpoint che tiene traccia del modello, che è gestito da un tf.train.CheckpointManager modo che venga conservato solo l'ultimo checkpoint.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una volta che il CheckpointManager è stato impostato, ora sei pronto per salvare e rimuovere i checkpoints non-chief worker salvati.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ora, quando è necessario eseguire il ripristino, è possibile trovare l'ultimo checkpoint salvato utilizzando la comoda funzione tf.train.latest_checkpoint . Dopo aver ripristinato il checkpoint, puoi continuare con l'addestramento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

20/20 [==============================] - 0s 13ms/step - loss: 2.2900 - accuracy: 0.1656
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2727 - accuracy: 0.1656

<tensorflow.python.keras.callbacks.History at 0x7fded1677dd8>

Callback di BackupAndRestore

Il callback di BackupAndRestore fornisce funzionalità di tolleranza agli errori, eseguendo il backup del modello e del numero di epoca corrente in un file di checkpoint temporaneo sotto backup_dir argomento BackupAndRestore su BackupAndRestore . Questo viene fatto alla fine di ogni epoca.

Una volta che i lavori vengono interrotti e riavviati, il callback ripristina l'ultimo checkpoint e l'addestramento continua dall'inizio dell'epoca interrotta. Qualsiasi addestramento parziale già svolto nell'epoca incompiuta prima dell'interruzione verrà gettato via, in modo che non influisca sullo stato finale del modello.

Per usarlo, fornire un'istanza di tf.keras.callbacks.experimental.BackupAndRestore alla chiamata tf.keras.Model.fit() .

Con MultiWorkerMirroredStrategy, se un lavoratore viene interrotto, l'intero cluster viene sospeso finché il lavoratore interrotto non viene riavviato. Anche gli altri worker verranno riavviati e il worker interrotto si ricongiungerà al cluster. Quindi, ogni worker legge il file del checkpoint precedentemente salvato e riprende il suo stato precedente, consentendo così al cluster di tornare sincronizzato. Quindi la formazione continua.

BackupAndRestore callback di BackupAndRestore utilizza CheckpointManager per salvare e ripristinare lo stato di addestramento, che genera un file chiamato checkpoint che tiene traccia dei checkpoint esistenti insieme a quello più recente. Per questo motivo, backup_dir non dovrebbe essere riutilizzato per memorizzare altri checkpoint al fine di evitare backup_dir nome.

Attualmente, la richiamata BackupAndRestore supporta un singolo lavoratore senza strategia, MirroredStrategy e multi-worker con MultiWorkerMirroredStrategy. Di seguito sono riportati due esempi sia per la formazione multi-lavoratore che per la formazione per singolo lavoratore.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 1s 14ms/step - loss: 2.2724 - accuracy: 0.2118
Epoch 2/3
70/70 [==============================] - 1s 14ms/step - loss: 2.2001 - accuracy: 0.4250
Epoch 3/3
70/70 [==============================] - 1s 14ms/step - loss: 2.1129 - accuracy: 0.5683

<tensorflow.python.keras.callbacks.History at 0x7fded0cdea20>

Se backup_dir la directory di backup_dir hai specificato in BackupAndRestore , potresti notare alcuni file di checkpoint generati temporaneamente. Questi file sono necessari per ripristinare le istanze perse in precedenza e verranno rimossi dalla libreria alla fine di tf.keras.Model.fit() dopo l'uscita dalla formazione.

Guarda anche

  1. La guida Distributed Training in TensorFlow fornisce una panoramica delle strategie di distribuzione disponibili.
  2. Modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
  3. La sezione Prestazioni nella guida fornisce informazioni su altre strategie e strumenti che è possibile utilizzare per ottimizzare le prestazioni dei modelli TensorFlow.