ML Community Day è il 9 novembre! Unisciti a noi per gli aggiornamenti da tensorflow, JAX, e più Per saperne di più

Ciclo di allenamento personalizzato con Keras e MultiWorkerMirroredStrategy

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza la fonte su GitHub Scarica il taccuino

Panoramica

Questo tutorial dimostra la formazione di più lavoratori con l'API del ciclo di formazione personalizzata, distribuita tramite MultiWorkerMirroredStrategy, in modo che un modello Keras progettato per l'esecuzione su un singolo lavoratore possa funzionare perfettamente su più lavoratori con modifiche minime al codice.

Utilizziamo cicli di addestramento personalizzati per addestrare il nostro modello perché ci danno flessibilità e un maggiore controllo sull'addestramento. Inoltre, è più semplice eseguire il debug del modello e del ciclo di addestramento. Informazioni più dettagliate sono disponibili in Scrivere un ciclo di allenamento da zero .

Se siete alla ricerca di come utilizzare MultiWorkerMirroredStrategy con keras model.fit , fare riferimento a questa esercitazione , invece.

La guida Distributed Training in TensorFlow è disponibile per una panoramica delle strategie di distribuzione supportate da TensorFlow per coloro che sono interessati a una comprensione più approfondita delle API tf.distribute.Strategy .

Impostare

Innanzitutto, alcune importazioni necessarie.

import json
import os
import sys

Prima di importare TensorFlow, apportare alcune modifiche all'ambiente.

Disabilita tutte le GPU. Ciò impedisce errori causati da tutti i lavoratori che cercano di utilizzare la stessa GPU. Per un'applicazione reale ogni lavoratore sarebbe su una macchina diversa.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Ripristina la variabile d'ambiente TF_CONFIG , ne vedrai di più in seguito.

os.environ.pop('TF_CONFIG', None)

Assicurati che la directory corrente sia sul percorso di Python. Ciò consente al notebook di importare i file scritti da %%writefile secondo momento.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Ora importa TensorFlow.

import tensorflow as tf

Dataset e definizione del modello

Quindi crea un file mnist.py con una semplice configurazione del modello e del set di dati. Questo file python verrà utilizzato dai processi di lavoro in questo tutorial:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Configurazione multi-lavoratore

Ora entriamo nel mondo della formazione multilavoratore. In TensorFlow, la variabile di ambiente TF_CONFIG è necessaria per l'addestramento su più macchine, ognuna delle quali può avere un ruolo diverso. TF_CONFIG utilizzato di seguito, è una stringa JSON utilizzata per specificare la configurazione del cluster su ciascun lavoratore che fa parte del cluster. Questo è il metodo predefinito per specificare un cluster, utilizzando cluster_resolver.TFConfigClusterResolver , ma sono disponibili altre opzioni nel modulo distribute.cluster_resolver .

Descrivi il tuo cluster

Ecco una configurazione di esempio:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ecco lo stesso TF_CONFIG serializzato come stringa JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Ci sono due componenti di TF_CONFIG : cluster e task .

  • cluster è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict composto da diversi tipi di lavori come worker . Nella formazione multi-lavoratore con MultiWorkerMirroredStrategy , di solito c'è un worker che si assume un po' più di responsabilità come salvare il checkpoint e scrivere un file di riepilogo per TensorBoard oltre a ciò che fa un normale worker . Tale lavoratore è indicato come il chief operaio, ed è consuetudine che il worker con index 0 sia nominato capo worker (in effetti è così tf.distribute.Strategy viene implementato tf.distribute.Strategy ).

  • task fornisce informazioni sull'attività corrente ed è diverso per ogni lavoratore. Specifica il type e l' index di quel lavoratore.

In questo esempio, imposti il type attività su "worker" e l' index attività su 0 . Questa macchina è il primo lavoratore e sarà nominato capo operaio e farà più lavoro degli altri. Si noti che anche altre macchine dovranno avere la variabile di ambiente TF_CONFIG impostata e dovrebbe avere lo stesso cluster dict, ma diverso type attività o index attività a seconda dei ruoli di tali macchine.

A scopo illustrativo, questo tutorial mostra come impostare un TF_CONFIG con 2 worker su localhost . In pratica, gli utenti creerebbero più worker su indirizzi IP/porte esterni e imposterebbero TF_CONFIG su ogni worker in modo appropriato.

In questo esempio utilizzerai 2 lavoratori, il TF_CONFIG del primo lavoratore è mostrato sopra. Per il secondo lavoratore dovresti impostare tf_config['task']['index']=1

Sopra, tf_config è solo una variabile locale in python. Per usarlo effettivamente per configurare l'addestramento, questo dizionario deve essere serializzato come JSON e inserito nella variabile di ambiente TF_CONFIG .

Variabili d'ambiente e sottoprocessi nei notebook

I sottoprocessi ereditano le variabili di ambiente dal genitore. Quindi, se imposti una variabile di ambiente in questo processo del jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

È possibile accedere alla variabile di ambiente da un sottoprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Nella prossima sezione, lo userai per passare TF_CONFIG ai sottoprocessi di lavoro. Non lanceresti mai i tuoi lavori in questo modo, ma è sufficiente per gli scopi di questo tutorial: per dimostrare un esempio minimo multi-operaio.

MultiWorkerMirroredStrategy

Per addestrare il modello, usa un'istanza di tf.distribute.MultiWorkerMirroredStrategy , che crea copie di tutte le variabili nei livelli del modello su ogni dispositivo in tutti i lavoratori. La guida tf.distribute.Strategy contiene maggiori dettagli su questa strategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

Utilizzare tf.distribute.Strategy.scope per specificare che deve essere utilizzata una strategia durante la creazione del modello. Questo ti mette nel " contesto di replica incrociata " per questa strategia, il che significa che la strategia ha il controllo di cose come il posizionamento variabile.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Dividi automaticamente i tuoi dati tra i lavoratori

Nella formazione multi-lavoratore, lo sharding del set di dati non è necessariamente necessario, tuttavia fornisce una semantica esattamente una volta che rende più riproducibile una maggiore formazione, ad es. la formazione su più lavoratori dovrebbe essere uguale alla formazione su un lavoratore. Nota: le prestazioni possono essere influenzate in alcuni casi.

Vedi: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Definisci il ciclo di addestramento personalizzato e addestra il modello

Specifica un ottimizzatore

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Definisci una fase di allenamento con tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Salvataggio e ripristino del checkpoint

L'implementazione del checkpoint in un ciclo di addestramento personalizzato richiede che l'utente lo gestisca invece di utilizzare un callback keras. Consente di salvare i pesi del modello e ripristinarli senza dover salvare l'intero modello.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Qui creerai un tf.train.Checkpoint che tiene traccia del modello, che è gestito da un tf.train.CheckpointManager modo che venga preservato solo l'ultimo checkpoint.

05567f7a0

Ora, quando hai bisogno di ripristinare, puoi trovare l'ultimo checkpoint salvato utilizzando la comoda funzione tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Dopo aver ripristinato il checkpoint, puoi continuare con l'addestramento del tuo ciclo di allenamento personalizzato.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.819531, train_loss: 0.561418.
Epoch: 1, accuracy: 0.938616, train_loss: 0.206848.
Epoch: 2, accuracy: 0.954799, train_loss: 0.146723.

Configurazione completa del codice sui lavoratori

Per funzionare effettivamente con MultiWorkerMirroredStrategy dovrai eseguire i processi di lavoro e passare loro un TF_CONFIG .

Come il file mnist.py scritto in precedenza, ecco il main.py che contiene lo stesso codice che abbiamo main.py passo dopo passo in precedenza in questa colab, lo stiamo semplicemente scrivendo su un file in modo che ciascuno dei lavoratori lo eseguirà:

File: main.py

Writing main.py

Formare e valutare

La directory corrente ora contiene entrambi i file Python:

ls *.py
main.py
mnist.py

Quindi json-serializza TF_CONFIG e aggiungilo alle variabili di ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora puoi avviare un processo di lavoro che eseguirà main.py e utilizzerà TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Ci sono alcune cose da notare sul comando precedente:

  1. Usa il %%bash che è una "magia" del notebook per eseguire alcuni comandi bash.
  2. Usa il flag --bg per eseguire il processo bash in background, perché questo worker non verrà terminato. Aspetta tutti i lavoratori prima di iniziare.

Il processo di lavoro in background non stamperà l'output su questo notebook, quindi &> reindirizza il suo output a un file, così puoi vedere cosa è successo.

Quindi, attendi qualche secondo per l'avvio del processo:

import time
time.sleep(20)

Ora guarda cosa è stato emesso finora nel file di registro del lavoratore:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

L'ultima riga del file di registro dovrebbe dire: Started server with target: grpc://localhost:12345 . Il primo lavoratore è ora pronto e sta aspettando che tutti gli altri lavoratori siano pronti per procedere.

Quindi aggiorna il tf_config per il processo del secondo lavoratore da raccogliere:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora avvia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire questo processo in background):

python main.py > /dev/null 2>&1

Ora, se ricontrolla i log scritti dal primo lavoratore, vedrai che ha partecipato all'addestramento di quel modello:

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:42:39.265636: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:42:39.266014: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch: 0, accuracy: 0.836384, train_loss: 0.517218.
Epoch: 1, accuracy: 0.937277, train_loss: 0.200661.
Epoch: 2, accuracy: 0.961161, train_loss: 0.137424.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formazione approfondita multi lavoratore

Questo tutorial ha dimostrato un flusso di lavoro del Custom Training Loop della configurazione multi-lavoratore. Una descrizione dettagliata di altri argomenti è disponibile nella model.fit's guide di model.fit's guide configurazione multi-worker e applicabile ai CTL.

Guarda anche

  1. La guida Distributed Training in TensorFlow fornisce una panoramica delle strategie di distribuzione disponibili.
  2. Modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
  3. La sezione Prestazioni nella guida fornisce informazioni su altre strategie e strumenti che è possibile utilizzare per ottimizzare le prestazioni dei modelli TensorFlow.