Entrenamiento y servicio de Vertex AI con TFX y Vertex Pipelines

Este tutorial basado en notebook creará y ejecutará una canalización TFX que entrena un modelo ML usando el servicio Vertex AI Training y lo publica en Vertex AI para servir.

Este portátil se basa en la tubería TFX hemos construido en simple TFX tubería por Vertex Tuberías Tutorial . Si aún no ha leído ese tutorial, debe leerlo antes de continuar con este cuaderno.

Puede entrenar modelos en Vertex AI usando AutoML o usar entrenamiento personalizado. En el entrenamiento personalizado, puede seleccionar muchos tipos de máquinas diferentes para potenciar sus trabajos de entrenamiento, habilitar el entrenamiento distribuido, usar el ajuste de hiperparámetros y acelerar con GPU.

También puede atender solicitudes de predicción implementando el modelo entrenado en Vertex AI Models y creando un punto final.

En este tutorial, usaremos Vertex AI Training con trabajos personalizados para entrenar un modelo en una canalización TFX. También implementaremos el modelo para atender la solicitud de predicción utilizando Vertex AI.

Este portátil está diseñado para ejecutarse en Google Colab o en Cuadernos de la plataforma de IA . Si no está utilizando uno de estos, simplemente puede hacer clic en el botón "Ejecutar en Google Colab" arriba.

Configurar

Si ha completado simple TFX tubería por Vertex Tuberías Tutorial , usted tiene un proyecto GCP de trabajo y un cubo de GCS y eso es todo lo que necesitamos para este tutorial. Lea primero el tutorial preliminar si se lo perdió.

Instalar paquetes de python

Instalaremos los paquetes de Python necesarios, incluidos TFX y KFP, para crear canalizaciones de aprendizaje automático y enviar trabajos a Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

¿Reiniciaste el tiempo de ejecución?

Si está utilizando Google Colab, la primera vez que ejecuta la celda anterior, debe reiniciar el tiempo de ejecución haciendo clic en el botón "REINICIAR TIEMPO DE EJECUCIÓN" o usando el menú "Tiempo de ejecución > Reiniciar tiempo de ejecución...". Esto se debe a la forma en que Colab carga los paquetes.

Si no está en Colab, puede reiniciar el tiempo de ejecución con la siguiente celda.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Iniciar sesión en Google para este cuaderno

Si está ejecutando este cuaderno en Colab, autentíquese con su cuenta de usuario:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Si se encuentra en la plataforma IA cuadernos, autenticarse con la nube de Google antes de ejecutar la siguiente sección, mediante la ejecución

gcloud auth login

en la ventana de terminal (que se puede abrir a través de Archivo> Nuevo en el menú). Solo necesita hacer esto una vez por instancia de notebook.

Compruebe las versiones del paquete.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

Configurar variables

Configuraremos algunas variables utilizadas para personalizar las canalizaciones a continuación. Se requiere la siguiente información:

Introduzca los valores requeridos en la celda de abajo antes de ejecutarlo.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Conjunto gcloud utilizar su proyecto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Preparar datos de ejemplo

Vamos a utilizar el mismo conjunto de datos Palmer Pingüinos tan simple TFX Pipeline Tutorial .

Hay cuatro características numéricas en este conjunto de datos que ya se normalizaron para tener un rango [0,1]. Vamos a construir un modelo de clasificación que predice las species de pingüinos.

Necesitamos hacer nuestra propia copia del conjunto de datos. Debido a que TFX ExampleGen lee las entradas de un directorio, necesitamos crear un directorio y copiarle un conjunto de datos en GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Eche un vistazo rápido al archivo CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Crear una canalización

Nuestra tubería será muy similar a la tubería que hemos creado en simple TFX tubería por Vertex Tuberías Tutorial . La canalización constará de tres componentes, CsvExampleGen, Trainer y Pusher. Pero usaremos un componente especial de Entrenador y Empujador. El componente Trainer moverá las cargas de trabajo de entrenamiento a Vertex AI, y el componente Pusher publicará el modelo de aprendizaje automático entrenado en Vertex AI en lugar de un sistema de archivos.

TFX proporciona una especial Trainer para enviar trabajos de capacitación para el servicio Vértice Training AI. Todo lo que tenemos que hacer es usar Trainer en el módulo de extensión en lugar del estándar Trainer componente junto con algunos parámetros GCP requeridos.

En este tutorial, ejecutaremos trabajos de Vertex AI Training solo usando CPU primero y luego con una GPU.

TFX también proporciona una especial Pusher para cargar el modelo de modelos Vertex AI. Pusher creará Vertex AI punto final de recursos para servir perdictions en línea, también. Consulte la documentación de Vertex AI para aprender más acerca de las predicciones en línea proporcionados por Vertex AI.

Escriba el código del modelo.

El modelo en sí es casi similar al modelo de simple TFX Pipeline Tutorial .

Vamos a añadir _get_distribution_strategy() la función que crea una estrategia de distribución TensorFlow y se utiliza en run_fn utilizar MirroredStrategy si la GPU está disponible.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copie el archivo del módulo en GCS, al que se puede acceder desde los componentes de la canalización.

De lo contrario, es posible que desee crear una imagen de contenedor que incluya el archivo del módulo y usar la imagen para ejecutar la canalización y los trabajos de capacitación de AI Platform.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Escribir una definición de canalización

Definiremos una función para crear una canalización TFX. Tiene los mismos tres componentes que en la simple TFX Pipeline Tutorial , pero nosotros usamos un Trainer y Pusher componente en el módulo de extensión GCP.

tfx.extensions.google_cloud_ai_platform.Trainer se comporta como un habitual Trainer , sino que sólo se mueve el cálculo para el modelo de formación de la nube. Inicia un trabajo personalizado en el servicio Vertex AI Training y el componente de entrenador en el sistema de orquestación solo esperará hasta que se complete el trabajo de Vertex AI Training.

tfx.extensions.google_cloud_ai_platform.Pusher crea una IA Modelo de vértices y un vértice AI punto final mediante el modelo entrenado.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Ejecute la canalización en Vertex Pipelines.

Vamos a utilizar Vertex Tuberías para ejecutar la tubería como lo hicimos en simple TFX tubería por Vertex Tuberías Tutorial .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

El archivo de definición generada puede ser presentada mediante Google Cloud cliente aiplatform en google-cloud-aiplatform paquete.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Ahora se puede visitar 'Vertex AI> Tuberías' en Google Cloud Console para ver el progreso.

Prueba con una solicitud de predicción

Una vez que se complete el oleoducto, encontrará un modelo desplegado en el uno de los puntos finales en 'Vertex AI> Endpoints'. Necesitamos saber la identificación del punto final para enviar una solicitud de predicción al nuevo punto final. Esto es diferente del nombre de punto final entramos anteriormente. Puede encontrar la identificación en la página de puntos finales en Google Cloud Console , parece que un número muy largo.

Establezca ENDPOINT_ID a continuación antes de ejecutarlo.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

Usamos el mismo cliente de aiplatform para enviar una solicitud al punto final. Enviaremos una solicitud de predicción para la clasificación de especies de pingüinos. La entrada son las cuatro características que usamos, y el modelo devolverá tres valores, porque nuestro modelo genera un valor para cada especie.

Por ejemplo, el siguiente ejemplo específico tiene el valor más grande en el índice '2' e imprimirá '2'.

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

Para obtener información detallada acerca de la predicción en línea, por favor visite la página de puntos finales en Google Cloud Console . puede encontrar una guía sobre cómo enviar solicitudes de muestra y enlaces a más recursos.

Ejecute la canalización con una GPU

Vertex AI admite el entrenamiento con varios tipos de máquinas, incluida la compatibilidad con GPU. Ver referencia Máquina de especificaciones para las opciones disponibles.

Ya definimos nuestra canalización para admitir el entrenamiento de GPU. Todo lo que tenemos que hacer es establecer use_gpu indicador en True. A continuación, una tubería se creará con una especificación de la máquina incluyendo una NVIDIA_TESLA_K80 y nuestro código de modelo de formación utilizará tf.distribute.MirroredStrategy .

Tenga en cuenta que use_gpu bandera no es una parte de la API de vértice o TFX. Solo se usa para controlar el código de entrenamiento en este tutorial.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Ahora se puede visitar 'Vertex AI> Tuberías' en Google Cloud Console para ver el progreso.

Limpiar

Ha creado un modelo Vertex AI y un punto final en este tutorial. Por favor, elimine estos recursos para evitar cargos no deseados yendo a puntos finales y anulación de la implementación del modelo desde el punto final primera. Luego puede eliminar el punto final y el modelo por separado.