Dzień Społeczności ML jest 9 listopada! Dołącz do nas na aktualizacje z TensorFlow Jax i więcej Dowiedz się więcej

Migracja szkolenia CPU/GPU dla wielu pracowników

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Ten przewodnik pokazuje, jak przeprowadzić migrację rozproszonego przepływu pracy szkolenia obejmującego wielu pracowników z TensorFlow 1 do TensorFlow 2.

Aby przeprowadzić szkolenie wieloosobowe z wykorzystaniem procesorów/GPU:

Ustawiać

Zacznij od kilku niezbędnych importów i prostego zestawu danych do celów demonstracyjnych:

# Install tf-nightly as the notebook uses a dataset instance for `Model.fit`
# with `ParameterServerStrategy`, which depends on symbols in TF 2.7.
!pip uninstall -q -y tensorflow keras
!pip install -q tf-nightly
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

Będziesz potrzebował 'TF_CONFIG' konfiguracji zmienną środowiskową dla szkoleń na wielu komputerach w TensorFlow. Użyj opcji 'TF_CONFIG' określenie 'cluster' i 'task' S'adresów. (Dowiedz się więcej w Distributed_training przewodnika).

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Użyj del oświadczenie usunąć zmienną (ale w świecie rzeczywistym szkolenia wielu pracowników w TensorFlow 1, nie będzie musiał tego robić):

del os.environ['TF_CONFIG']

TensorFlow 1: Szkolenie rozproszone dla wielu pracowników z interfejsami API tf.estimator

Poniższy fragment kodu demonstruje kanoniczną obieg szkolenia wielu pracowników w TF1: można użyć tf.estimator.Estimator , A tf.estimator.TrainSpec , A tf.estimator.EvalSpec i tf.estimator.train_and_evaluate API do dystrybucji trening:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpu5zsvkn2
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpu5zsvkn2', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.0176871, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-22T20:01:54
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.16660s
INFO:tensorflow:Finished evaluation at 2021-09-22-20:01:55
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.0040814565
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.034454126.
({'loss': 0.0040814565, 'global_step': 3}, [])

TensorFlow 2: Szkolenie dla wielu pracowników ze strategiami dystrybucji

W TensorFlow 2, rozmieszczone w wielu szkoleń pracowników z CPU, GPU i TPU odbywa się poprzez tf.distribute.Strategy s.

Poniższy przykład pokazuje, jak używać dwie takie strategie: tf.distribute.experimental.ParameterServerStrategy i tf.distribute.MultiWorkerMirroredStrategy , z których oba są przeznaczone do szkolenia CPU / GPU z wielu pracowników.

ParameterServerStrategy zatrudnia koordynatora ( 'chief' ), co czyni go bardziej przyjaznym ze środowiskiem w tym Colab notebooka. Będziesz użyciu niektórych narzędzi, żeby założyć elementy uzupełniające istotne dla runnable doświadczenie tutaj: można utworzyć klaster w procesie, w którym wątki są używane do symulowania serwery parametrów ( 'ps' ) oraz pracowników ( 'worker' ) . Aby uzyskać więcej informacji na temat szkolenia serwerze parametr, odnoszą się do szkolenia serwerze parametryczny z ParameterServerStrategy tutoriala.

W tym przykładzie, pierwszy określenie 'TF_CONFIG' zmienną środowiska z tf.distribute.cluster_resolver.TFConfigClusterResolver dostarczenie informacji klastra. Jeśli używasz systemu zarządzania klastrem dla rozproszonego szkolenia, sprawdź czy to zapewnia 'TF_CONFIG' dla ciebie już, w takim przypadku nie trzeba jawnie ustawić tę zmienną środowiskową. (Dowiedz się więcej w konfigurowaniu 'TF_CONFIG' zmiennym przekroju środowisko w Ukazuje szkolenia z TensorFlow przewodnika).

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

Następnie utwórz tf.distribute.Server s dla pracowników i serwerów parametrów jeden po jednym:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

W świecie rzeczywistym rozprowadzane szkolenia, zamiast rozpoczynać wszystkich tf.distribute.Server s na koordynatora, zostanie za pomocą wielu komputerów, a te, które są oznaczone jako "worker" S "ps" (serwery parametrów) będzie każdy uruchomić tf.distribute.Server . Patrz Klastry w realnym części świata w szkolenia serwer parametryczny tutorialu więcej szczegółów.

Z wszystko gotowe, utwórz ParameterServerStrategy obiektu:

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Po utworzeniu obiektu strategii, określenie modelu, optymalizator i inne zmienne, i wywołać Keras Model.compile w Strategy.scope API do dystrybucji szkolenia. (Zapoznaj się z Strategy.scope docs API, aby uzyskać więcej informacji.)

Jeśli wolisz, aby dostosować szkolenia poprzez, na przykład, określanie przodu i do tyłu podań znajdują się szkolenia z niestandardowym szkoleniowej sekcji pętli w parametryczny szkolenia serwer tutorialu więcej szczegółów.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-09-22 20:01:56.897008: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.899102: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.914645: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 2s - loss: 0.1728 - 2s/epoch - 176ms/step
Epoch 2/5
10/10 - 0s - loss: 0.0102 - 73ms/epoch - 7ms/step
Epoch 3/5
10/10 - 0s - loss: 0.0091 - 71ms/epoch - 7ms/step
Epoch 4/5
10/10 - 0s - loss: 0.0083 - 71ms/epoch - 7ms/step
Epoch 5/5
10/10 - 0s - loss: 0.0077 - 73ms/epoch - 7ms/step
<keras.callbacks.History at 0x7fec502068d0>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 12s - loss: 0.0761
2021-09-22 20:01:58.817232: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:58.997047: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
8/10 [=======================>......] - ETA: 0s - loss: 0.2278
2021-09-22 20:01:59.177706: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 50ms/step - loss: 0.2356
{'loss': 0.23556283}

Zaborczych ( tf.distribute.experimental.partitioners )

ParameterServerStrategy w TensorFlow 2 wsporniki zmienny podział i oferty samych zaborców jak TensorFlow 1, z mniej skomplikowany nazwami: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : a partycjonowania, który utrzymuje odłamki na podstawie wielkości maksymalnej) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : a partycjonujący że alokuje Minimalny rozmiar za odłamek. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : a partycjonujący że alokuje stała liczba odłamków.

Alternatywnie, można użyć MultiWorkerMirroredStrategy obiektu:

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Można zastąpić strategię stosowaną powyżej z MultiWorkerMirroredStrategy obiektu do przeprowadzenia szkolenia z tej strategii.

Podobnie jak w przypadku tf.estimator API, ponieważ MultiWorkerMirroredStrategy to strategia multi-client, nie ma łatwego sposobu, aby uruchomić rozproszoną szkolenia w tej Colab notebooka. Dlatego zastąpienie powyższego kodu tą strategią kończy się uruchamianiem rzeczy lokalnie. Szkolenie Wielu pracownik z Keras Model.fit / A Custom szkoleniowe pętla tutoriale wykazać jak prowadzić szkolenia wielu pracowników z 'TF_CONFIG' zmiennej utworzonej z dwóch pracowników na localhost w Colab. W praktyce, by utworzyć wiele robotników na zewnętrznych adresów IP / portów i użyć 'TF_CONFIG' zmienną określić konfigurację klastra dla każdego pracownika.

Następne kroki

Aby dowiedzieć się więcej na temat multi-pracownika rozproszoną szkolenia z tf.distribute.experimental.ParameterServerStrategy i tf.distribute.MultiWorkerMirroredStrategy w TensorFlow 2, należy rozważyć następujące zasoby: