Vertex AI Training e Serving con TFX e Vertex Pipelines

Questo tutorial basato su notebook creerà ed eseguirà una pipeline TFX che addestra un modello ML utilizzando il servizio Vertex AI Training e lo pubblica su Vertex AI per il servizio.

Questo notebook è basato sul gasdotto TFX abbiamo costruito in semplice TFX Pipeline per Vertex Pipelines Tutorial . Se non hai ancora letto quel tutorial, dovresti leggerlo prima di procedere con questo quaderno.

Puoi addestrare i modelli su Vertex AI utilizzando AutoML o utilizzare l'addestramento personalizzato. Nella formazione personalizzata, puoi selezionare molti diversi tipi di macchine per potenziare i tuoi lavori di formazione, abilitare la formazione distribuita, utilizzare l'ottimizzazione degli iperparametri e accelerare con le GPU.

Puoi anche servire le richieste di previsione distribuendo il modello addestrato a Vertex AI Models e creando un endpoint.

In questo tutorial, utilizzeremo Vertex AI Training con lavori personalizzati per addestrare un modello in una pipeline TFX. Distribuiremo anche il modello per servire la richiesta di previsione utilizzando Vertex AI.

Questo notebook è progettato per essere eseguito su Google Colab o IA della piattaforma notebook . Se non stai utilizzando uno di questi, puoi semplicemente fare clic sul pulsante "Esegui in Google Colab" sopra.

Impostare

Se avete completato semplice TFX Pipeline per Vertex Pipelines Tutorial , si avrà un progetto GCP di lavoro e un secchio GCS e che è tutto abbiamo bisogno per questo tutorial. Si prega di leggere prima il tutorial preliminare se ve lo siete perso.

Installa i pacchetti Python

Installeremo i pacchetti Python richiesti, inclusi TFX e KFP, per creare pipeline ML e inviare lavori a Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Hai riavviato il runtime?

Se stai utilizzando Google Colab, la prima volta che esegui la cella in alto, devi riavviare il runtime facendo clic sul pulsante "RIAVVIA RUNTIME" sopra o utilizzando il menu "Runtime > Riavvia runtime ...". Ciò è dovuto al modo in cui Colab carica i pacchetti.

Se non sei su Colab, puoi riavviare il runtime con la seguente cella.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Accedi a Google per questo blocco note

Se stai eseguendo questo notebook su Colab, autenticati con il tuo account utente:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Se siete su AI piattaforma notebook, autenticarsi con Google Cloud prima di eseguire la sezione successiva, eseguendo

gcloud auth login

nella finestra di Terminale (che è possibile aprire tramite File> Nuovo nel menu). Devi farlo solo una volta per istanza di notebook.

Controlla le versioni del pacchetto.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

Imposta variabili

Di seguito imposteremo alcune variabili utilizzate per personalizzare le pipeline. Sono necessarie le seguenti informazioni:

Immettere i valori richiesti nella cella di seguito prima di eseguirlo.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Set gcloud di utilizzare il progetto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Prepara dati di esempio

Useremo lo stesso Palmer Pinguini set di dati come semplice TFX Pipeline Tutorial .

Ci sono quattro caratteristiche numeriche in questo set di dati che erano già normalizzate per avere un intervallo [0,1]. Costruiremo un modello di classificazione che prevede le species di pinguini.

Dobbiamo creare la nostra copia del set di dati. Poiché TFX ExampleGen legge gli input da una directory, è necessario creare una directory e copiarvi il set di dati su GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Dai una rapida occhiata al file CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Crea una pipeline

La nostra pipeline sarà molto simile al gasdotto che abbiamo creato in semplice TFX Pipeline per Vertex Pipelines Tutorial . La pipeline sarà composta da tre componenti, CsvExampleGen, Trainer e Pusher. Ma useremo uno speciale componente Trainer e Pusher. Il componente Trainer sposterà i carichi di lavoro di addestramento su Vertex AI e il componente Pusher pubblicherà il modello ML addestrato su Vertex AI invece di un filesystem.

TFX fornisce uno speciale Trainer per inviare i lavori di formazione per il servizio Vertex AI Formazione. Tutto ciò che dobbiamo fare è l'uso Trainer nel modulo di estensione invece dello standard Trainer componente insieme ad alcuni parametri GCP richiesti.

In questo tutorial, eseguiremo i lavori Vertex AI Training solo utilizzando prima le CPU e poi con una GPU.

TFX fornisce anche una speciale Pusher per caricare il modello ai modelli Vertex AI. Pusher creerà risorsa Vertex AI Endpoint per servire perdictions on-line, anche. Vedere la documentazione Vertex AI per conoscere meglio le previsioni online forniti da Vertex AI.

Scrivi il codice del modello.

Il modello in sé è quasi simile al modello di semplice TFX Pipeline Tutorial .

Noi aggiungeremo _get_distribution_strategy() funzione che crea una strategia di distribuzione tensorflow ed è utilizzato in run_fn utilizzare MirroredStrategy se GPU è disponibile.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copia il file del modulo in GCS a cui è possibile accedere dai componenti della pipeline.

In caso contrario, potresti voler creare un'immagine del contenitore che includa il file del modulo e utilizzare l'immagine per eseguire la pipeline e i processi di formazione di AI Platform.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Scrivi una definizione di pipeline

Definiremo una funzione per creare una pipeline TFX. Ha le stesse tre componenti, come nel semplice TFX Pipeline Tutorial , ma usiamo un Trainer e Pusher componente del modulo di estensione GCP.

tfx.extensions.google_cloud_ai_platform.Trainer si comporta come un normale Trainer , ma semplicemente sposta il calcolo per la formazione modello di cloud. Avvia un lavoro personalizzato nel servizio Vertex AI Training e il componente trainer nel sistema di orchestrazione attenderà solo il completamento del lavoro Vertex AI Training.

tfx.extensions.google_cloud_ai_platform.Pusher crea un Vertex AI modello e un vertice AI endpoint utilizzando il modello addestrato.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Eseguire la pipeline su Vertex Pipelines.

Useremo Vertex Condotte per eseguire il gasdotto come abbiamo fatto nel semplice TFX Pipeline per Vertex Pipelines Tutorial .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

Il file di definizione generato può essere presentata utilizzando Google Cloud cliente aiplatform in google-cloud-aiplatform pacchetto.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Ora è possibile visitare 'Vertex AI> Pipelines' in Google Cloud Console per vedere i progressi.

Prova con una richiesta di previsione

Una volta che il completamento del gasdotto, troverete un modello implementato in uno dei punti finali in 'Vertex AI> Endpoint'. Abbiamo bisogno di conoscere l'id dell'endpoint per inviare una richiesta di previsione al nuovo endpoint. Questo è diverso dal nome dell'endpoint siamo entrati in precedenza. È possibile trovare l'ID alla pagina di Endpoint in Google Cloud Console , sembra una lunghissima serie.

Imposta ENDPOINT_ID di seguito prima di eseguirlo.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

Usiamo lo stesso client aiplatform per inviare una richiesta all'endpoint. Invieremo una richiesta di previsione per la classificazione delle specie di pinguini. L'input sono le quattro caratteristiche che abbiamo usato e il modello restituirà tre valori, perché il nostro modello emette un valore per ogni specie.

Ad esempio, il seguente esempio specifico ha il valore più grande all'indice '2' e stamperà '2'.

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

Per informazioni dettagliate sulla previsione in linea, visita la pagina di Endpoint in Google Cloud Console . puoi trovare una guida sull'invio di richieste di campioni e collegamenti a ulteriori risorse.

Esegui la pipeline utilizzando una GPU

Vertex AI supporta l'addestramento utilizzando vari tipi di macchine, incluso il supporto per le GPU. Vedi di riferimento Macchina specifica per le opzioni disponibili.

Abbiamo già definito la nostra pipeline per supportare l'addestramento GPU. Tutto quello che dobbiamo fare è impostare use_gpu bandiera su True. Poi un gasdotto verrà creato con una specifica macchina di cui uno NVIDIA_TESLA_K80 e il nostro codice di formazione modello utilizzerà tf.distribute.MirroredStrategy .

Si noti che use_gpu bandiera non è una parte della API Vertex o TFX. Viene utilizzato solo per controllare il codice di addestramento in questo tutorial.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Ora è possibile visitare 'Vertex AI> Pipelines' in Google Cloud Console per vedere i progressi.

Pulire

Hai creato un modello AI Vertex e un endpoint in questo tutorial. Si prega di eliminare queste risorse per evitare eventuali spese indesiderate andando a endpoint e undeploying il modello dal punto finale prima. Quindi puoi eliminare l'endpoint e il modello separatamente.