Обучение и обслуживание Vertex AI с помощью TFX и Vertex Pipelines

В этом руководстве на основе записной книжки будет создан и запущен конвейер TFX, который обучает модель машинного обучения с помощью службы Vertex AI Training и публикует ее в Vertex AI для обслуживания.

Этот ноутбук на основе TFX трубопровода мы построили в простом TFX трубопроводе для вершинных конвейеров Учебного пособия . Если вы еще не читали это руководство, вам следует прочитать его, прежде чем продолжить работу с этой записной книжкой.

Вы можете обучать модели Vertex AI с помощью AutoML или использовать индивидуальное обучение. В настраиваемом обучении вы можете выбрать множество различных типов машин для выполнения ваших учебных заданий, включить распределенное обучение, использовать настройку гиперпараметров и ускорить работу с помощью графических процессоров.

Вы также можете обслуживать запросы прогнозирования, развернув обученную модель в Vertex AI Models и создав конечную точку.

В этом руководстве мы будем использовать Vertex AI Training с настраиваемыми заданиями для обучения модели в конвейере TFX. Мы также развернем модель для обслуживания запросов прогнозов с использованием Vertex AI.

Этот ноутбук предназначен для запуска на Google Colab или AI платформ ноутбуков . Если вы не используете один из них, вы можете просто нажать кнопку «Запустить в Google Colab» выше.

Настраивать

Если вы закончили Простой TFX трубопроводов для вершинных конвейеров Tutorial , вы будете иметь рабочий проект GCP и GCS ведро , и это все , что нам нужно для этого урока. Пожалуйста, прочтите предварительное руководство, если вы его пропустили.

Установить пакеты Python

Мы установим необходимые пакеты Python, включая TFX и KFP, для создания конвейеров машинного обучения и отправки заданий в Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Вы перезапускали среду выполнения?

Если вы используете Google Colab, при первом запуске указанной выше ячейки вы должны перезапустить среду выполнения, нажав кнопку «ПЕРЕЗАГРУЗИТЬ ВРЕМЯ» выше или используя меню «Время выполнения> Перезапустить среду выполнения ...». Это связано с тем, как Colab загружает пакеты.

Если вы не используете Colab, вы можете перезапустить среду выполнения со следующей ячейкой.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Войдите в Google для этого блокнота

Если вы используете этот блокнот на Colab, авторизуйтесь со своей учетной записью:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Если вы на AI платформы ноутбуков, аутентифицировавшимся Google Cloud перед запуском в следующем разделе, запустив

gcloud auth login

в терминальном окне (который можно открыть с помощью File> New в меню). Вам нужно сделать это только один раз для каждого экземпляра записной книжки.

Проверьте версии пакета.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

Настроить переменные

Мы настроим некоторые переменные, используемые для настройки конвейеров ниже. Требуется следующая информация:

Введите требуемые значения в ячейке ниже перед запуском.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Набор gcloud использовать свой проект.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Подготовить пример данных

Мы будем использовать тот же Палмер Пингвины набор данных как простой TFX Pipeline Учебник .

В этом наборе данных есть четыре числовых объекта, которые уже были нормализованы до диапазона [0,1]. Мы будем строить модель классификации , которая предсказывает species пингвинов.

Нам нужно сделать нашу собственную копию набора данных. Поскольку TFX ExampleGen считывает входные данные из каталога, нам нужно создать каталог и скопировать в него набор данных на GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Взгляните на файл CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Создать конвейер

Наш трубопровод будет очень похож на трубопровод , который мы создали в простом TFX трубопроводе для вершинных конвейеров Учебного пособия . Конвейер будет состоять из трех компонентов: CsvExampleGen, Trainer и Pusher. Но мы воспользуемся специальными компонентами Trainer и Pusher. Компонент Trainer переместит учебные рабочие нагрузки в Vertex AI, а компонент Pusher опубликует обученную модель машинного обучения в Vertex AI вместо файловой системы.

TFX предоставляет специальный Trainer для представления учебных рабочих мест для обслуживания Vertex AI Training. Все , что нужно сделать , это использовать Trainer в модуле расширения вместо стандартного Trainer компонент наряду с некоторыми необходимыми параметрами GCP.

В этом руководстве мы будем запускать задания Vertex AI Training только с использованием сначала ЦП, а затем с помощью графического процессора.

TFX также предоставляет специальный Pusher для загрузки модели для Vertex AI моделей. Pusher создаст Vertex AI Endpoint ресурс для обслуживания онлайн perdictions тоже. Смотрите документацию Vertex AI более узнать о онлайн предсказания , предоставляемые Vertex AI.

Напишите код модели.

Сама модель практически аналогична модели в простой TFX Pipeline Учебник .

Мы добавим _get_distribution_strategy() функцию , которая создает стратегию распределения TensorFlow и используется в run_fn использовать MirroredStrategy если GPU доступен.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Скопируйте файл модуля в GCS, к которому можно получить доступ из компонентов конвейера.

В противном случае вы можете создать образ контейнера, включая файл модуля, и использовать этот образ для запуска конвейера и заданий по обучению AI Platform.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Напишите определение конвейера

Мы определим функцию для создания конвейера TFX. Он имеет те же три компоненты , как и в простом TFX Pipeline Учебник , но мы используем Trainer и Pusher компонент в модуле расширения GCP.

tfx.extensions.google_cloud_ai_platform.Trainer ведет себя как обычный Trainer , но он просто перемещает вычисление для модели подготовки к облаку. Он запускает настраиваемое задание в сервисе Vertex AI Training, а обучающий компонент в системе оркестровки просто ожидает завершения задания Vertex AI Training.

tfx.extensions.google_cloud_ai_platform.Pusher создает Vertex AI Модель и Vertex AI Конточка с помощью обученной модели.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Запустите конвейер на Vertex Pipelines.

Мы будем использовать вершинные конвейеры для запуска трубопровода , как мы делали в простом TFX трубопроводе для вершинных конвейеров Учебного пособия .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

Файл генерируется определение может быть представлено с помощью aiplatform клиента Google Cloud в google-cloud-aiplatform пакет.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Теперь вы можете посетить «Vertex AI> Трубопроводы» в Google Cloud Console , чтобы увидеть прогресс.

Тест с запросом прогноза

После того , как Завершает трубопровод, вы найдете развернутую модель на одной из конечных точек в «Vertex AI> Endpoints». Нам нужно знать идентификатор конечной точки, чтобы отправить запрос прогноза на новую конечную точку. Это отличается от названия конечных точек мы вошли выше. Вы можете найти идентификатор на странице Endpoints в Google Cloud Console , это выглядит как очень длинное число.

Перед запуском установите ENDPOINT_ID ниже.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

Мы используем тот же клиент aiplatform для отправки запроса на конечную точку. Мы отправим запрос на прогноз классификации видов пингвинов. Входными данными являются четыре функции, которые мы использовали, и модель вернет три значения, потому что наша модель выводит по одному значению для каждого вида.

Например, в следующем конкретном примере наибольшее значение имеет индекс «2» и будет напечатано «2».

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

Для получения более подробной информации о онлайн предсказании, пожалуйста , посетите страницу Endpoints в Google Cloud Console . вы можете найти руководство по отправке образцов запросов и ссылки на другие ресурсы.

Запустите конвейер с помощью графического процессора

Vertex AI поддерживает обучение с использованием различных типов машин, включая поддержку графических процессоров. См Machine SPEC ссылки для доступных опций.

Мы уже определили наш конвейер для поддержки обучения GPU. Все , что нам нужно сделать , это установка use_gpu флаг True. Затем трубопровод будет создан с машиной спецификации , включая один NVIDIA_TESLA_K80 и наш код модели обучения будет использовать tf.distribute.MirroredStrategy .

Обратите внимание , что use_gpu флаг не является частью Vertex или TFX API. Он просто используется для управления обучающим кодом в этом руководстве.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Теперь вы можете посетить «Vertex AI> Трубопроводы» в Google Cloud Console , чтобы увидеть прогресс.

Убираться

В этом руководстве вы создали Vertex AI Model и Endpoint. Пожалуйста , удалите эти ресурсы , чтобы избежать каких - либо нежелательных расходов, перейдя в Endpoints и отмены развертывания модели с конечной точки первой. Затем вы можете удалить конечную точку и модель отдельно.