Formazione sul server dei parametri con ParameterServerStrategy

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza la fonte su GitHub Scarica taccuino

Panoramica

Formazione Server Parametro è un metodo di dati parallelo comune a scalare la formazione modello su più macchine.

Un cluster di server di formazione parametro è costituito da lavoratori e dei server di parametri. Le variabili vengono create sui server dei parametri e vengono lette e aggiornate dai lavoratori in ogni passaggio. Per impostazione predefinita, i lavoratori leggono e aggiornano queste variabili in modo indipendente senza sincronizzarsi tra loro. Questo è il motivo per cui a volte il parametro di formazione in stile server si chiama formazione asincrona.

In tensorflow 2, la formazione del server parametro è alimentato dal tf.distribute.experimental.ParameterServerStrategy classe, che distribuisce le fasi di formazione per un gruppo in grado di scalare fino a migliaia di lavoratori (accompagnato dai server dei parametri).

Metodi di allenamento supportati

Esistono due principali metodi di formazione supportati:

Un cluster con lavori e compiti

Indipendentemente delle API di scelta ( Model.fit o un ciclo di formazione personalizzato), la formazione distribuito in tensorflow 2 comporta: un 'cluster' con diversi 'jobs' , e ciascuno dei posti di lavoro può avere uno o più 'tasks' .

Quando si utilizza l'addestramento del server dei parametri, si consiglia di avere:

  • Un lavoro coordinatore (che ha il nome del lavoro chief )
  • Posti di lavoro dei lavoratori multiple (nome del lavoro worker ); e
  • Lavori server parametro multiple (nome del lavoro ps )

Mentre il coordinatore crea risorse, dispacci formazione compiti, scrive posti di blocco, e si occupa di errori di attività, i lavoratori ei server di parametri corrono tf.distribute.Server che ascoltare le richieste del coordinatore.

Formazione Server Parametro con Model.fit API

Formazione del server parametro con il Model.fit API richiede il coordinatore di usare un tf.distribute.experimental.ParameterServerStrategy oggetto, e un tf.keras.utils.experimental.DatasetCreator come ingresso. Simile a Model.fit utilizzo con nessuna strategia, o con altre strategie, il flusso di lavoro prevede la creazione e la compilazione del modello, la redazione dei callback, seguita da una Model.fit chiamata.

Addestramento del server dei parametri con un ciclo di addestramento personalizzato

Con i cicli di formazione personalizzati, il tf.distribute.experimental.coordinator.ClusterCoordinator classe è il componente chiave utilizzata per il coordinatore.

Il più importante API fornita dal ClusterCoordinator oggetto è schedule :

  • Il schedule API accoda un tf.function e restituisce un futuro simile RemoteValue immediatamente.
  • Le funzioni in coda saranno spediti ai lavoratori remoti in thread in background e la loro RemoteValue s saranno riempiti in modo asincrono.
  • Dato schedule non richiede l'assegnazione dei lavoratori, il tf.function passò in può essere eseguito su qualsiasi lavoratore a disposizione.
  • Se il lavoratore su cui viene eseguito diventa non disponibile prima del suo completamento, la funzione verrà ritentata su un altro lavoratore disponibile.
  • A causa di questo fatto e del fatto che l'esecuzione della funzione non è atomica, una funzione può essere eseguita più di una volta.

In aggiunta alle funzioni di dispacciamento a distanza, il ClusterCoordinator aiuta anche a creare set di dati su tutti i lavoratori e ricostruire questi insiemi di dati quando un lavoratore recupera da fallimento.

Configurazione tutorial

Il tutorial si diramano in Model.fit e percorsi ciclo di formazione personalizzati, e si può scegliere quello che si adatta alle vostre esigenze. Le sezioni diverse da "Formazione con X" sono applicabili a entrambi i percorsi.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

Configurazione del cluster

Come accennato in precedenza, un cluster di server di formazione parametro richiede un compito coordinatore che esegue il programma di allenamento, uno o più lavoratori e le attività del server di parametri che correre tensorflow Server- tf.distribute.Server -e forse un compito di valutazione ulteriore che la valutazione corre sidecar (vedi la sezione di valutazione del sidecar di seguito). I requisiti per configurarli sono:

  • L'attività del coordinatore deve conoscere gli indirizzi e le porte di tutti gli altri server TensorFlow eccetto il valutatore.
  • I lavoratori e i server dei parametri devono sapere quale porta devono ascoltare. Per semplicità, di solito è possibile passare le informazioni complete sul cluster durante la creazione di server TensorFlow su queste attività.
  • L'attività del valutatore non deve conoscere l'impostazione del cluster di formazione. In caso affermativo, non dovrebbe tentare di connettersi al cluster di addestramento.
  • I lavoratori ed i server di parametri dovrebbero avere tipi di attività come "worker" e "ps" , rispettivamente. Il coordinatore dovrebbe usare "chief" come tipo di attività per motivi di eredità.

In questo tutorial, creerai un cluster in-process in modo che l'intero training del server dei parametri possa essere eseguito in Colab. Imparerete come impostare i cluster reali in una sezione successiva.

Cluster in corso

Inizierai creando diversi server TensorFlow in anticipo e connettendoti ad essi in seguito. Si noti che questo è solo per lo scopo della manifestazione di questo tutorial, e in allenamento vero e proprio server sarà avviato il "worker" e "ps" macchine.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

L'installazione del cluster in-process viene frequentemente utilizzata nel test di unità, come qui .

Un'altra opzione per il test locale è di lanciare i processi sulla macchina-check out locale di formazione multi-lavoratore con Keras per un esempio di questo approccio.

Crea un'istanza di ParameterServerStrategy

Prima di tuffarsi nel codice di formazione, cerchiamo di creare un'istanza di un ParameterServerStrategy oggetto. Si noti che questo è necessario, indipendentemente dal fatto che si sta procedendo con Model.fit o un ciclo di formazione personalizzato. Il variable_partitioner argomento sarà spiegato nella sezione sharding variabile .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

Per utilizzare le GPU per la formazione, allocare le GPU visibili a ciascun lavoratore. ParameterServerStrategy utilizzerà tutte le GPU disponibili su ogni lavoratore, con la restrizione che tutti i lavoratori dovrebbero avere lo stesso numero di GPU disponibili.

Sharding variabile

Sharding variabile si riferisce alla scissione una variabile in più variabili più piccoli, che sono chiamati frammenti. Lo sharding variabile può essere utile per distribuire il carico di rete quando si accede a questi shard. È anche utile distribuire il calcolo e l'archiviazione di una variabile normale su più server di parametri.

Per abilitare sharding variabile, è possibile passare in un variable_partitioner quando si costruisce un ParameterServerStrategy oggetto. Il variable_partitioner verrà richiamato ogni volta che viene creata una variabile e si prevede di restituire il numero di frammenti lungo ogni dimensione della variabile. Alcuni out-of-box variable_partitioner s sono forniti come tf.distribute.experimental.partitioners.MinSizePartitioner . Si consiglia di utilizzare partitori size-based come tf.distribute.experimental.partitioners.MinSizePartitioner per evitare il partizionamento piccole variabili, che potrebbero avere un impatto negativo sulla velocità di formazione del modello.

Quando un variable_partitioner viene passato e se si crea una variabile direttamente sotto strategy.scope() , diventerà un tipo di contenitore con variables proprietà che consente di accedere alla lista di frammenti. Nella maggior parte dei casi, questo contenitore verrà automaticamente convertito in un tensore concatenando tutti i frammenti. Di conseguenza, può essere utilizzata come una variabile normale. D'altra parte, alcuni metodi tensorflow come tf.nn.embedding_lookup prevedono attuazione efficiente per questo tipo di contenitore e in questi metodi concatenazione automatica sarà evitata.

Si prega di consultare la documentazione API di tf.distribute.experimental.ParameterServerStrategy per maggiori dettagli.

Formazione con Model.fit

Keras fornisce un'API di formazione facile da usare tramite Model.fit che gestisce il ciclo di formazione sotto il cofano, con la flessibilità di override train_step , e callback, che forniscono funzionalità come il salvataggio checkpoint o sintesi risparmio per TensorBoard. Con Model.fit , lo stesso codice di formazione può essere utilizzato per altre strategie con un semplice scambio dell'oggetto strategia.

Dati in ingresso

Model.fit con la formazione del server parametro richiede che i dati di input essere forniti in un richiamabile che prende un singolo argomento di tipo tf.distribute.InputContext , e restituisce un tf.data.Dataset . Quindi, creare un tf.keras.utils.experimental.DatasetCreator oggetto che richiede tale callable , e un optional tf.distribute.InputOptions oggetto tramite input_options argomento.

Si noti che si consiglia di mischiare e ripetere i dati con la formazione del server parametro, e specificare steps_per_epoch in fit chiamata in modo biblioteca conosce i confini d'epoca.

Si prega di consultare la ingresso Distributed tutorial per ulteriori informazioni sulla InputContext argomento.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Il codice in dataset_fn verrà richiamato sul dispositivo di input, che di solito è la CPU, su ciascuna delle macchine worker.

Costruzione e compilazione del modello

Ora, si creerà un tf.keras.Model banale -a tf.keras.models.Sequential modello per scopi dimostrativi-seguita da un Model.compile chiamata di incorporare componenti, come un ottimizzatore, metriche o parametri quali steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Richiamate e formazione

Prima di chiamare model.fit per la formazione vera e propria, cerchiamo di preparare i callback necessarie per le attività comuni, come ad esempio:

  • ModelCheckpoint : per salvare i pesi modello.
  • BackupAndRestore : per assicurarsi che i progressi di formazione viene automaticamente eseguito il backup, e recuperato se l'esperienza a grappolo indisponibilità (come interruzione o prelazione); o
  • TensorBoard : per salvare le relazioni sui progressi compiuti in file di riepilogo, che vengono visualizzati in strumento TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

Utilizzo diretto con ClusterCoordinator (opzionale)

Anche se si sceglie la Model.fit percorso formativo, si può opzionalmente un'istanza di un tf.distribute.experimental.coordinator.ClusterCoordinator oggetto per programmare altre funzioni che si desidera eseguire sui lavoratori. Vedere la formazione con un ciclo di formazione personalizzato per ulteriori dettagli ed esempi.

Allenamento con un ciclo di allenamento personalizzato

Utilizzando i cicli di formazione personalizzato con tf.distribute.Strategy offre una grande flessibilità per definire i cicli di formazione. Con il ParameterServerStrategy definito sopra (come strategy ), si utilizzerà un tf.distribute.experimental.coordinator.ClusterCoordinator di spedire l'esecuzione di fasi di formazione per i lavoratori remoti.

Poi, verrà creato un modello, definire un set di dati e una funzione a gradino, come avete fatto nel ciclo di formazione con altri tf.distribute.Strategy s. Potete trovare maggiori dettagli nella formazione personalizzata con tf.distribute.Strategy tutorial.

Per garantire un efficace prefetching set di dati, utilizzare la raccomandata distribuiti set di dati API creazione menzionati nei passi di formazione spedizione verso i lavoratori a distanza sezione sottostante. Inoltre, assicurarsi di chiamare Strategy.run all'interno worker_fn di trarre pieno vantaggio di GPU destinate ai lavoratori. Il resto dei passaggi è lo stesso per l'allenamento con o senza GPU.

Creiamo questi componenti nei seguenti passaggi:

Imposta i dati

In primo luogo, scrivere una funzione che crea un insieme di dati che include la trasformazione logica implementata dal KERAS strati di pre-elaborazione .

Potrai creare questi strati esterni della dataset_fn ma applicare la trasformazione all'interno del dataset_fn , dal momento che si avvolgere la dataset_fn in un tf.function , che non consente le variabili da creare al suo interno.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

Genera esempi di giocattoli in un set di dati:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Quindi, creare il set di dati di formazione avvolto in un dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Costruisci il modello

Quindi, crea il modello e altri oggetti. Assicurati di creare tutte le variabili sotto strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Confermano di Let che l'uso di FixedShardsPartitioner diviso tutte le variabili in due frammenti e ogni frammento è stato assegnato a server diversi parametri:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Definire la fase di formazione

In terzo luogo, creare la fase di formazione avvolto in un tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Nella funzione fase di formazione di cui sopra, chiamando Strategy.run e Strategy.reduce nel step_fn in grado di supportare più GPU per lavoratore. Se i lavoratori hanno assegnato GPU, Strategy.run distribuirà i set di dati su più repliche.

Invia i passaggi di formazione ai lavoratori a distanza

Dopo che tutti i calcoli sono definiti da ParameterServerStrategy , si utilizzerà la tf.distribute.experimental.coordinator.ClusterCoordinator classe per creare risorse e distribuire i passi di formazione per i lavoratori remoti.

Diamo prima creare un ClusterCoordinator oggetto e passare l'oggetto di strategia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Quindi, crea un set di dati per lavoratore e un iteratore. Nelle per_worker_dataset_fn sotto, avvolgendo il dataset_fn in strategy.distribute_datasets_from_function si raccomanda che prelettura efficiente GPU senza problemi.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Il passo finale è di distribuire il calcolo di lavoratori remoti utilizzando ClusterCoordinator.schedule :

  • Il schedule metodo accoda un tf.function e restituisce un futuro simile RemoteValue immediatamente. Le funzioni in coda saranno spediti ai lavoratori remoti in thread in background e la RemoteValue saranno riempiti in modo asincrono.
  • Il join il metodo ( ClusterCoordinator.join ) può essere utilizzato per aspettare fino a quando vengono eseguite tutte le funzioni programmate.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Ecco come è possibile recuperare il risultato di una RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

In alternativa, puoi avviare tutti i passaggi e fare qualcosa in attesa del completamento:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Per la formazione completa e il flusso di lavoro che servono per questo particolare esempio, si prega di consultare questa prova .

Ulteriori informazioni sulla creazione di set di dati

Il set di dati nel codice sopra viene creata utilizzando il ClusterCoordinator.create_per_worker_dataset API). Crea un set di dati per lavoratore e restituisce un oggetto contenitore. È possibile chiamare iter metodo su di esso per creare un iteratore per lavoratore. Il per lavoratore iteratore contiene un iteratore per lavoratore e la fetta corrispondente di un lavoratore sostituito nell'argomento ingresso della funzione passato al ClusterCoordinator.schedule metodo prima la funzione viene eseguita su un determinato lavoratore.

Attualmente, il ClusterCoordinator.schedule metodo presuppone lavoratori sono equivalenti e assume quindi le serie di dati sui lavoratori differenti sono le stesse tranne possono essere mescolate in modo diverso se contengono un Dataset.shuffle operazione. Per questo motivo, si raccomanda inoltre che i set di dati da ripetere all'infinito e pianificare un numero finito di passi anziché basarsi sulla OutOfRangeError da un set di dati.

Un'altra nota importante è che tf.data set di dati non supportano la serializzazione e deserializzazione implicita attraverso i confini di attività. Quindi è importante per creare l'intero set di dati all'interno della funzione passata a ClusterCoordinator.create_per_worker_dataset .

Valutazione

Esiste più di un modo per definire ed eseguire un ciclo di valutazione nella formazione distribuita. Ognuno ha i suoi pro e contro come descritto di seguito. Il metodo di valutazione in linea è consigliato se non si hanno preferenze.

Valutazione in linea

In questo metodo, si alterna coordinatore tra formazione e la valutazione, e quindi è chiamata è la valutazione in linea.

Ci sono diversi vantaggi della valutazione in linea. Per esempio:

  • Può supportare modelli di valutazione di grandi dimensioni e set di dati di valutazione che una singola attività non può contenere.
  • I risultati della valutazione possono essere utilizzati per prendere decisioni per la formazione dell'epoca successiva.

Esistono due modi per implementare la valutazione in linea: valutazione diretta e valutazione distribuita.

  • Valutazione diretta: Per i piccoli modelli e set di dati di valutazione, il coordinatore può essere eseguito valutazione direttamente sul modello distribuito con il set di dati di valutazione sul coordinatore:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Valutazione distribuita: Per i modelli di grandi dimensioni o di insiemi di dati che sono fattibile per essere eseguito direttamente sul coordinatore, il compito coordinatore può distribuire compiti di valutazione ai lavoratori attraverso i ClusterCoordinator.schedule / ClusterCoordinator.join metodi:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Valutazione del sidecar

Un altro metodo è chiamato valutazione sidecar in cui si crea un'attività valutatore dedicato che più volte si legge posti di blocco e gira la valutazione su un ultimo punto di controllo. Consente al programma di allenamento di terminare in anticipo se non è necessario modificare il ciclo di allenamento in base ai risultati della valutazione. Tuttavia, richiede un'attività di valutatore aggiuntiva e un checkpoint periodico per attivare la valutazione. Di seguito è riportato un possibile ciclo di valutazione del sidecar:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Cluster nel mondo reale

In un ambiente di produzione reale, eseguirai tutte le attività in diversi processi su macchine diverse. Il modo più semplice per configurare le informazioni di cluster su ogni compito è quello di impostare "TF_CONFIG" variabili di ambiente e utilizzare un tf.distribute.cluster_resolver.TFConfigClusterResolver per analizzare "TF_CONFIG" .

Per una descrizione generale su "TF_CONFIG" variabili di ambiente, fare riferimento alla formazione Distributed guida.

Se si avvia le attività di formazione utilizzando kubernetes o di altri modelli di configurazione, è molto probabile che questi modelli hanno già impostato “TF_CONFIG" per voi.

Impostare il "TF_CONFIG" variabile d'ambiente

Supponiamo di avere 3 operai e 2 server di parametri, il "TF_CONFIG" di lavoratore 1 può essere:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

Il "TF_CONFIG" del valutatore può essere:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Il "cluster" parte al di sopra "TF_CONFIG" stringa per il valutatore è facoltativo.

Se usi lo stesso binario per tutte le attività

Se preferisci eseguire tutte queste attività utilizzando un singolo binario, dovrai lasciare che il tuo programma si dirama in ruoli diversi all'inizio:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Il codice seguente avvia un server TensorFlow e attende:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Gestire il fallimento dell'attività

Fallimento del lavoratore

tf.distribute.experimental.coordinator.ClusterCoordinator o Model.fit forniscono built-in tolleranza ai guasti per il fallimento dei lavoratori. Al momento il recupero dei lavoratori, la funzione precedentemente fornito set di dati (sia per ClusterCoordinator.create_per_worker_dataset per un ciclo di formazione personalizzato, o tf.keras.utils.experimental.DatasetCreator per Model.fit ) saranno richiamati i lavoratori a ricreare i set di dati.

Errore del server dei parametri o del coordinatore

Tuttavia, quando il coordinatore vede un errore del server parametro, genererà un UnavailableError o AbortedError immediatamente. In questo caso puoi riavviare il coordinatore. Anche il coordinatore stesso può diventare non disponibile. Pertanto, alcuni strumenti sono consigliati per non perdere i progressi della formazione:

  • Per Model.fit , è necessario utilizzare un BackupAndRestore callback, che gestisce il risparmio di avanzamento e il ripristino automatico. Vedere richiamate e formazione sezione precedente per un esempio.

  • Per un ciclo di addestramento personalizzato, è necessario eseguire periodicamente il checkpoint delle variabili del modello e caricare le variabili del modello da un checkpoint, se presente, prima dell'inizio dell'addestramento. Il progresso di formazione si può dedurre circa da optimizer.iterations se un ottimizzatore è un punto di arresto:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Recupero di un RemoteValue

Recupero di un RemoteValue è garantito per avere successo se una funzione viene eseguita con successo. Questo perché attualmente il valore restituito viene copiato immediatamente nel coordinatore dopo l'esecuzione di una funzione. Se si verifica un errore del lavoratore durante la copia, la funzione verrà ritentata su un altro lavoratore disponibile. Pertanto, se si desidera ottimizzare le prestazioni, è possibile pianificare le funzioni senza un valore restituito.

Segnalazione errori

Una volta che il coordinatore vede un errore come UnavailableError dai server dei parametri o altri errori di applicazione come ad esempio un InvalidArgument da tf.debugging.check_numerics , che cancellerà tutte le funzioni in corso e in coda prima di sollevare l'errore. Recupero loro corrispondenti RemoteValue s alzerà un CancelledError .

Dopo che viene generato un errore, il coordinatore non solleverà lo stesso errore o alcun errore dalle funzioni annullate.

Miglioramento delle prestazioni

Ci sono diversi motivi possibili se si vede problemi di prestazioni quando ci si allena con ParameterServerStrategy e ClusterResolver .

Un motivo comune è che i server dei parametri hanno un carico sbilanciato e alcuni server dei parametri molto carichi hanno raggiunto la capacità. Ci possono essere anche più cause alla radice. Alcuni semplici metodi per mitigare questo problema sono:

  1. Shard vostri grandi variabili del modello tramite specificando un variable_partitioner quando si costruisce un ParameterServerStrategy .
  2. Se possibile, evitare di creare una variabile hotspot richiesta da tutti i server dei parametri in un unico passaggio. Ad esempio, utilizzare un tasso di apprendimento costante o sottoclasse tf.keras.optimizers.schedules.LearningRateSchedule nel ottimizzatori dal momento che il comportamento di default è che il tasso di apprendimento diventerà una variabile posto su un particolare server dei parametri e richiesto da tutti gli altri server di parametri in ogni fase .
  3. Mescola i tuoi grandi vocabolari prima di passarli ai livelli di pre-elaborazione di Keras.

Un altro possibile motivo per problemi di prestazioni è il coordinatore. Tua prima implementazione del schedule / join è basato su Python e quindi possono avere filettatura sovraccarico. Anche la latenza tra il coordinatore e gli operatori può essere elevata. Se questo è il caso,

  • Per Model.fit , è possibile impostare steps_per_execution argomento fornito al Model.compile ad un valore superiore a 1.

  • Per un ciclo di formazione personalizzato, è possibile confezionare più passaggi in un unico tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Poiché la libreria è ulteriormente ottimizzata, si spera che la maggior parte degli utenti non dovrà impacchettare manualmente i passaggi in futuro.

Inoltre, un piccolo trucco per il miglioramento delle prestazioni consiste nel pianificare le funzioni senza un valore restituito, come spiegato nella precedente sezione relativa agli errori delle attività di gestione.

Limitazioni note

La maggior parte delle limitazioni note sono già trattate nelle sezioni precedenti. Questa sezione fornisce un riepilogo.

ParameterServerStrategy generale

  • os.environment["grpc_fail_fast"]="use_caller" è necessaria per ogni attività tra cui il coordinatore, per far funzionare correttamente la tolleranza ai guasti.
  • La formazione del server dei parametri sincroni non è supportata.
  • Di solito è necessario raggruppare più passaggi in un'unica funzione per ottenere prestazioni ottimali.
  • Non è supportato per caricare un saved_model tramite tf.saved_model.load contenente variabili sharded. Nota che il caricamento di un tale modello salvato utilizzando TensorFlow Serving dovrebbe funzionare.
  • Non è supportato caricare un checkpoint contenente variabili di slot dell'ottimizzatore partizioni in un numero diverso di partizioni.
  • Non è supportato il ripristino da un errore del server dei parametri senza riavviare l'attività del coordinatore.
  • L'utilizzo di tf.lookup.StaticHashTable (che è comunemente impiegato da alcuni tf.keras.layers.experimental.preprocessing strati, come IntegerLookup , StringLookup , e TextVectorization ) si traduce in risorse immesse sul coordinatore in questo momento con la formazione del server parametro. Ciò ha implicazioni sulle prestazioni per la ricerca di RPC dai lavoratori al coordinatore. Questa è un'alta priorità attuale da affrontare.

Model.fit specifiche

  • steps_per_epoch argomento è obbligatorio in Model.fit . È possibile selezionare un valore che fornisce intervalli appropriati in un'epoca.
  • ParameterServerStrategy non ha il supporto per le richiamate personalizzate che hanno chiamate a livello di gruppo per motivi di prestazioni. Si dovrebbe convertire le chiamate in chiamate a livello epoca con opportunamente scelto steps_per_epoch , in modo che essi sono chiamati ogni steps_per_epoch numero di passi. I callback incorporati non sono interessati: le loro chiamate a livello di batch sono state modificate per essere performanti. Sostenere chiamate a livello di gruppo per ParameterServerStrategy è stata pianificata.
  • Per lo stesso motivo, a differenza di altre strategie, la barra di avanzamento e le metriche vengono registrate solo ai confini dell'epoca.
  • run_eagerly non è supportato.

Specifiche del ciclo di allenamento personalizzato