Entrenamiento Distribuido

La capacitación distribuida es un tipo de capacitación modelo en la que los requisitos de recursos informáticos (por ejemplo, CPU, RAM) se distribuyen entre varias computadoras. La capacitación distribuida permite entrenar más rápido y en conjuntos de datos más grandes (hasta unos pocos miles de millones de ejemplos).

La capacitación distribuida también es útil para la optimización automatizada de hiperparámetros donde se entrenan múltiples modelos en paralelo.

En este documento aprenderá cómo:

  • Entrene un modelo TF-DF mediante entrenamiento distribuido.
  • Ajuste los hiperparámetros de un modelo TF-DF mediante entrenamiento distribuido.

Limitaciones

A partir de ahora, se admite la formación distribuida para:

  • Entrenamiento de modelos de árboles potenciados por gradiente con tfdf.keras.DistributedGradientBoostedTreesModel . Los modelos de árboles potenciados con gradiente distribuido son equivalentes a sus homólogos no distribuidos.
  • Búsqueda de hiperparámetros para cualquier tipo de modelo TF-DF.

Cómo habilitar la capacitación distribuida

Esta sección enumera los pasos para habilitar la capacitación distribuida. Para ver ejemplos completos, consulte la siguiente sección.

Alcance de ParameterServerStrategy

El modelo y el conjunto de datos se definen en un alcance ParameterServerStrategy .

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

Formato de conjunto de datos

Al igual que para la capacitación no distribuida, los conjuntos de datos se pueden proporcionar como

  1. Un conjunto de datos distribuido de flujo tensorial finito, o
  2. una ruta a los archivos del conjunto de datos utilizando uno de los formatos de conjuntos de datos compatibles .

Usar archivos fragmentados es significativamente más simple que usar el enfoque de conjunto de datos distribuido de flujo tensorial finito (1 línea frente a ~20 líneas de código). Sin embargo, solo el enfoque del conjunto de datos de tensorflow admite el preprocesamiento de TensorFlow. Si su canalización no contiene ningún preprocesamiento, se recomienda la opción de conjunto de datos fragmentados.

En ambos casos, el conjunto de datos debe dividirse en varios archivos para distribuir la lectura del conjunto de datos de manera eficiente.

Trabajadores de configuración

Un proceso principal es el programa que ejecuta el código Python que define el modelo TensorFlow. Este proceso no ejecuta ningún cálculo pesado. El cómputo de la formación efectiva lo realizan los trabajadores . Los trabajadores son procesos que ejecutan un servidor de parámetros de TensorFlow.

El jefe debe estar configurado con la dirección IP de los trabajadores. Esto se puede hacer usando la variable de entorno TF_CONFIG o creando un ClusterResolver . Consulte Capacitación del servidor de parámetros con ParameterServerStrategy para obtener más detalles.

ParameterServerStrategy de TensorFlow define dos tipos de trabajadores: "trabajadores" y "servidor de parámetros". TensorFlow requiere que se cree una instancia de al menos uno de cada tipo de trabajador. Sin embargo, TF-DF sólo utiliza "trabajadores". Por lo tanto, es necesario crear una instancia de un "servidor de parámetros", pero TF-DF no lo utilizará. Por ejemplo, la configuración de una capacitación TF-DF podría verse de la siguiente manera:

  • 1 jefe
  • 50 trabajadores
  • 1 servidor de parámetros

Los trabajadores necesitan acceso a las operaciones de capacitación personalizadas de TensorFlow Decision Forests. Hay dos opciones para habilitar el acceso:

  1. Utilice el servidor de parámetros TF-DF C++ preconfigurado //third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_server .
  2. Cree un servidor de parámetros llamando tf.distribute.Server() . En este caso, se debe importar TF-DF import tensorflow_decision_forests .

Ejemplos

Esta sección muestra ejemplos completos de configuraciones de capacitación distribuida. Para obtener más ejemplos, consulte las pruebas unitarias TF-DF .

Ejemplo: entrenamiento distribuido en la ruta del conjunto de datos

Divida su conjunto de datos en un conjunto de archivos fragmentados utilizando uno de los formatos de conjuntos de datos compatibles . Se recomienda nombrar los archivos de la siguiente manera: /path/to/dataset/train-<5 digit index>-of-<total files> , por ejemplo

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

Para lograr la máxima eficiencia, la cantidad de archivos debe ser al menos 10 veces la cantidad de trabajadores. Por ejemplo, si está capacitando a 100 trabajadores, asegúrese de que el conjunto de datos esté dividido en al menos 1000 archivos.

Luego se puede hacer referencia a los archivos con una expresión de fragmentación como:

  • /ruta/al/conjunto de datos/tren@1000
  • /ruta/al/conjunto de datos/tren@*

La formación distribuida se realiza de la siguiente manera. En este ejemplo, el conjunto de datos se almacena como un TFRecord de ejemplos de TensorFlow (definido por la clave tfrecord+tfe ).

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

Ejemplo: entrenamiento distribuido en un conjunto de datos distribuido finito de TensorFlow

TF-DF espera un conjunto de datos de TensorFlow distribuido, finito y fragmentado por trabajadores:

  • Distribuido : un conjunto de datos no distribuido está incluido en strategy.distribute_datasets_from_function .
  • finito : el conjunto de datos debe leer cada ejemplo exactamente una vez. El conjunto de datos no debe contener instrucciones repeat .
  • trabajador fragmentado : cada trabajador debe leer una parte separada del conjunto de datos.

Aquí hay un ejemplo:

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

Ejemplo: ajuste distribuido de hiperparámetros en la ruta de un conjunto de datos

El ajuste distribuido de hiperparámetros en la ruta de un conjunto de datos es similar al entrenamiento distribuido. La única diferencia es que esta opción es compatible con modelos no distribuidos. Por ejemplo, puede distribuir el ajuste de hiperparámetros del modelo de árboles potenciados por gradiente (no distribuido).

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

Ejemplo: pruebas unitarias

Para realizar pruebas unitarias de la capacitación distribuida, puede crear procesos de trabajo simulados. Consulte el método _create_in_process_tf_ps_cluster en las pruebas unitarias TF-DF para obtener más información.