День сообщества ML - 9 ноября! Присоединяйтесь к нам для обновления от TensorFlow, JAX, и многое другое Подробнее

Перенос обучения ЦП / ГП с несколькими рабочими

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

В этом руководстве показано, как перенести рабочий процесс распределенного обучения с несколькими сотрудниками с TensorFlow 1 на TensorFlow 2.

Чтобы выполнить обучение с несколькими рабочими с помощью CPU / GPU:

Настраивать

Начните с необходимого импорта и простого набора данных для демонстрационных целей:

# Install tf-nightly as the notebook uses a dataset instance for `Model.fit`
# with `ParameterServerStrategy`, which depends on symbols in TF 2.7.
!pip uninstall -q -y tensorflow keras
!pip install -q tf-nightly
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

Вы будете нуждаться в 'TF_CONFIG' конфигурации переменной среды для обучения на нескольких машинах в TensorFlow. Используйте 'TF_CONFIG' , чтобы указать 'cluster' и 'task' ы адреса. (Подробнее в Distributed_training руководстве.)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Используйте del удалить переменную заявление (но в реальном мире обучения нескольких работников в TensorFlow 1, вам не придется делать это):

del os.environ['TF_CONFIG']

TensorFlow 1: распределенное обучение нескольких сотрудников с помощью API tf.estimator

Следующий фрагмент кода демонстрирует канонический рабочий процесс обучения нескольких работников в TF1: вы будете использовать tf.estimator.Estimator , tf.estimator.TrainSpec , tf.estimator.EvalSpec и tf.estimator.train_and_evaluate API для распространения тренировка:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpu5zsvkn2
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpu5zsvkn2', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.0176871, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-22T20:01:54
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.16660s
INFO:tensorflow:Finished evaluation at 2021-09-22-20:01:55
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.0040814565
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.034454126.
({'loss': 0.0040814565, 'global_step': 3}, [])

TensorFlow 2: обучение нескольких сотрудников со стратегиями распространения

В TensorFlow 2, распределенное обучение по нескольким работникам с процессорами, графическими процессорами и TPUs осуществляется через tf.distribute.Strategy s.

В следующем примере показано , как использовать два таких стратегий: tf.distribute.experimental.ParameterServerStrategy и tf.distribute.MultiWorkerMirroredStrategy , оба из которых предназначены для подготовки CPU / GPU с несколькими рабочими.

ParameterServerStrategy использует координатор ( 'chief' ), что делает его более дружественным с окружающей средой в этом Colab ноутбуке. Вы будете использовать некоторые утилиты здесь , чтобы создать опорные элементы необходимые для работоспособного опыта здесь вы будете создавать в процессе кластер, где потоки используются для имитации серверов параметров ( 'ps' ) и рабочих ( 'worker' ) . Для получения дополнительной информации о подготовке сервера параметров, обратитесь к серверу подготовке параметров с ParameterServerStrategy учебником.

В этом примере первое определение 'TF_CONFIG' переменную среды с tf.distribute.cluster_resolver.TFConfigClusterResolver предоставить информацию кластера. Если вы используете систему управления кластером для распределенного обучения, проверьте , если он обеспечивает 'TF_CONFIG' для вас уже, в этом случае вам не нужно явно установить эту переменную окружения. (Узнайте больше в настройке 'TF_CONFIG' переменное окружение секции в распределенной подготовке с TensorFlow руководством) .

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

Затем создайте tf.distribute.Server s для рабочих серверов и параметров один за одним:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

В реальном мире распределены обучение, вместо того чтобы начинать все в tf.distribute.Server сек на координатора, вы будете использовать несколько машин, а также те, которые обозначены как "worker" s и "ps" (серверы параметров) будет каждый запустить tf.distribute.Server . Обратитесь к кластерам в мире реального раздела в Parameter учебного сервера учебник для более подробной информации.

С все готово, создать ParameterServerStrategy объект:

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

После того, как вы создали объект стратегию, определить модель, оптимизатор, и другие переменные, и вызовите Keras Model.compile в Strategy.scope API для распространения обучения. (Обратитесь к Strategy.scope API документации для получения дополнительной информации.)

Если вы хотите , чтобы настроить свое обучение посредством, например, определения вперед и назад пассов, см Обучения с секцией петлевого обучения в пользовательских параметрах обучения сервера учебника для более подробной информации.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-09-22 20:01:56.897008: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.899102: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.914645: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 2s - loss: 0.1728 - 2s/epoch - 176ms/step
Epoch 2/5
10/10 - 0s - loss: 0.0102 - 73ms/epoch - 7ms/step
Epoch 3/5
10/10 - 0s - loss: 0.0091 - 71ms/epoch - 7ms/step
Epoch 4/5
10/10 - 0s - loss: 0.0083 - 71ms/epoch - 7ms/step
Epoch 5/5
10/10 - 0s - loss: 0.0077 - 73ms/epoch - 7ms/step
<keras.callbacks.History at 0x7fec502068d0>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 12s - loss: 0.0761
2021-09-22 20:01:58.817232: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:58.997047: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
8/10 [=======================>......] - ETA: 0s - loss: 0.2278
2021-09-22 20:01:59.177706: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 50ms/step - loss: 0.2356
{'loss': 0.23556283}

( Редактирования разделов tf.distribute.experimental.partitioners )

ParameterServerStrategy в TensorFlow 2 поддерживает переменные разделения и предлагают такое же , как TensorFlow редактирования разделы 1, с менее запутанным названиями: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : а что держит разметки черепков под размером максимального) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : а что выделяет разметки минимального размера за осколок. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : а что выделяет разметки фиксированное число черепков.

Кроме того , вы можете использовать MultiWorkerMirroredStrategy объект:

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Вы можете заменить стратегию , используемую выше , с MultiWorkerMirroredStrategy объектом , чтобы выполнить обучение с этой стратегией.

Как и с tf.estimator APIs, поскольку MultiWorkerMirroredStrategy стратегия мульти-клиент, не существует простой способ для запуска распределенной подготовки в этой Colab ноутбук. Следовательно, замена приведенного выше кода этой стратегией приводит к тому, что все работает локально. Обучение Multi-работник с Keras Model.fit / настраиваемая подготовка цикл учебники показывают , как запустить обучение мульти-мастера , с 'TF_CONFIG' переменным , созданные с двумя рабочими на локальном хосте в Colab. На практике, вы можете создать несколько рабочих на внешних IP - адресов / портов, а также использовать 'TF_CONFIG' переменную , чтобы определить конфигурацию кластера для каждого работника.

Следующие шаги

Чтобы узнать больше о мульти-работнике распределенного обучения с tf.distribute.experimental.ParameterServerStrategy и tf.distribute.MultiWorkerMirroredStrategy в TensorFlow 2, рассмотрят следующие ресурсы: