Szkolenie i obsługa Vertex AI za pomocą TFX i Vertex Pipelines

Ten samouczek oparty na notatniku utworzy i uruchomi potok TFX, który szkoli model ML przy użyciu usługi Vertex AI Training i publikuje go w Vertex AI w celu obsługi.

Notebook ten jest oparty na rurociągu TFX my zbudowany w prosty TFX Pipeline dla Vertex rurociągów Tutorial . Jeśli jeszcze nie przeczytałeś tego samouczka, powinieneś go przeczytać przed kontynuowaniem tego notatnika.

Możesz trenować modele w Vertex AI za pomocą AutoML lub skorzystać z treningu niestandardowego. W szkoleniach niestandardowych możesz wybrać wiele różnych typów maszyn do obsługi zadań szkoleniowych, umożliwić szkolenie rozproszone, korzystać z dostrajania hiperparametrów i przyspieszać za pomocą procesorów GPU.

Możesz również obsługiwać żądania predykcji, wdrażając wyszkolony model w modelach Vertex AI i tworząc punkt końcowy.

W tym samouczku użyjemy szkolenia Vertex AI z zadaniami niestandardowymi, aby trenować model w potoku TFX. Wdrożymy również model do obsługi żądań predykcji za pomocą Vertex AI.

Ten notebook jest przeznaczony do uruchomienia na Google Colab lub AI platformy notebooków . Jeśli nie korzystasz z żadnego z nich, możesz po prostu kliknąć przycisk „Uruchom w Google Colab” powyżej.

Ustawiać

Jeśli masz ukończone Proste TFX Pipeline dla Vertex rurociągów Tutorial , będziesz miał działający projekt GCP i wiadro GCS i to wszystko, czego potrzebujemy do tego tutoriala. Przeczytaj najpierw wstępny samouczek, jeśli go przegapiłeś.

Zainstaluj pakiety Pythona

Zainstalujemy wymagane pakiety Pythona, w tym TFX i KFP, do tworzenia potoków ML i przesyłania zadań do Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Czy uruchomiłeś ponownie środowisko wykonawcze?

Jeśli korzystasz z Google Colab, przy pierwszym uruchomieniu powyższej komórki musisz ponownie uruchomić środowisko wykonawcze, klikając powyżej przycisk „RESTART RUNTIME” lub korzystając z menu „Runtime > Restart runtime...”. Wynika to ze sposobu, w jaki Colab ładuje paczki.

Jeśli nie korzystasz z Colab, możesz ponownie uruchomić środowisko uruchomieniowe za pomocą następującej komórki.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Zaloguj się do Google dla tego notatnika

Jeśli używasz tego notatnika w Colab, uwierzytelnij się za pomocą swojego konta użytkownika:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Jeśli jesteś na AI Platformy notebooki, uwierzytelniania Google Cloud przed uruchomieniem następnej sekcji, uruchamiając

gcloud auth login

W oknie terminala (który można otworzyć za pomocą polecenia Plik> Nowy w menu). Wystarczy to zrobić tylko raz na instancję notatnika.

Sprawdź wersje pakietów.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0
KFP version: 1.8.1

Ustaw zmienne

Poniżej skonfigurujemy kilka zmiennych używanych do dostosowania potoków. Wymagane są następujące informacje:

Przed uruchomieniem go wprowadzić żądane wartości w komórce poniżej.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Zestaw gcloud korzystanie z projektu.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Przygotuj przykładowe dane

Będziemy korzystać z tego samego zestawu danych Palmer Penguins jako Proste TFX Pipeline Tutorial .

W tym zbiorze danych znajdują się cztery cechy liczbowe, które zostały już znormalizowane tak, aby miały zakres [0,1]. Będziemy budować model klasyfikacji który przewiduje species pingwinów.

Musimy zrobić własną kopię zbioru danych. Ponieważ TFX ExampleGen odczytuje dane wejściowe z katalogu, musimy utworzyć katalog i skopiować do niego zestaw danych w GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Rzuć okiem na plik CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Utwórz potok

Nasz rurociąg będzie bardzo podobna do rurociągu stworzyliśmy w prosty TFX Pipeline dla Vertex rurociągów Tutorial . Potok będzie składał się z trzech komponentów: CsvExampleGen, Trainer i Pusher. Ale użyjemy specjalnego komponentu Trainer i Pusher. Składnik Trainer przeniesie obciążenia szkoleniowe do Vertex AI, a składnik Pusher opublikuje wyszkolony model ML do Vertex AI zamiast do systemu plików.

TFX zapewnia specjalny Trainer do złożenia oferty na usługi szkoleniowe Vertex AI Szkoleniowego. Wszystko co musimy zrobić, to użyć Trainer w module rozszerzenia zamiast standardowego Trainer komponentu wraz z niektórych wymaganych parametrów GCP.

W tym samouczku uruchomimy zadania Vertex AI Training tylko przy użyciu procesorów, a następnie GPU.

TFX zapewnia również specjalną Pusher aby przesłać model do Vertex AI Models. Pusher stworzy Vertex AI Endpoint zasób służyć perdictions internetowych, too. Patrz dokumentacja Vertex AI , aby dowiedzieć się więcej na temat przepowiedni internetowych świadczonych przez Vertex AI.

Napisz kod modelu.

Sam model jest prawie podobna do modelu w prosty TFX Pipeline Tutorial .

Dodamy _get_distribution_strategy() funkcji, która tworzy strategię dystrybucji TensorFlow i stosuje się go w run_fn używać MirroredStrategy jeśli GPU jest dostępna.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Skopiuj plik modułu do GCS, do którego można uzyskać dostęp z komponentów potoku.

W przeciwnym razie możesz chcieć skompilować obraz kontenera zawierający plik modułu i użyć obrazu do uruchomienia zadań potoku i szkolenia AI Platform.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Napisz definicję potoku

Zdefiniujemy funkcję do tworzenia potoku TFX. Posiada te same trzy składniki jak w prosty TFX Pipeline tutoriala , ale używamy Trainer i Pusher komponent w module rozszerzenia GCP.

tfx.extensions.google_cloud_ai_platform.Trainer zachowuje się jak zwykły Trainer , ale to przesuwa obliczeń dla modelu kształcenia do chmury. Uruchamia niestandardowe zadanie w usłudze Vertex AI Training, a komponent trenera w systemie orkiestracji będzie po prostu czekać, aż zadanie Vertex AI Training się zakończy.

tfx.extensions.google_cloud_ai_platform.Pusher tworzy Vertex AI wzorem i Vertex AI Endpoint pomocą przeszkolonego model.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Uruchom potok na Vertex Pipelines.

Użyjemy Vertex Rurociągi do uruchomienia rurociągu jak my w prosty TFX rurociągu Vertex rurociągów Tutorial .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

Wygenerowany plik definicji mogą być składane za pomocą Google Cloud klienta aiplatform w google-cloud-aiplatform opakowaniu.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Teraz możesz odwiedzić 'Vertex AI> rurociągu w Google Cloud Console , aby zobaczyć postępy.

Przetestuj z żądaniem prognozy

Gdy zakończy rurociągowych, znajdziesz wdrożonego modelu w jednym z punktów końcowych w „Vertex AI> punktów końcowych”. Musimy znać identyfikator punktu końcowego, aby wysłać żądanie przewidywania do nowego punktu końcowego. Różni się to od nazwy punktu końcowego weszliśmy powyżej. Można znaleźć identyfikator na stronie Endpoints w Google Cloud Console , wygląda bardzo długi numer.

Ustaw ENDPOINT_ID poniżej przed uruchomieniem.

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

Używamy tego samego klienta aiplatform do wysyłania żądania do punktu końcowego. Wyślemy prośbę o prognozę dotyczącą klasyfikacji gatunków pingwinów. Dane wejściowe to cztery cechy, których użyliśmy, a model zwróci trzy wartości, ponieważ nasz model wyprowadza jedną wartość dla każdego gatunku.

Na przykład następujący konkretny przykład ma największą wartość w indeksie „2” i wyświetli „2”.

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

Szczegółowe informacje na temat przewidywania internetowego, odwiedź stronę Punkty końcowe w Google Cloud Console . możesz znaleźć przewodnik dotyczący wysyłania przykładowych próśb i linki do innych zasobów.

Uruchom potok za pomocą GPU

Vertex AI obsługuje szkolenia przy użyciu różnych typów maszyn, w tym obsługi procesorów graficznych. Zobacz maszynie Spec odniesienie do dostępnych opcji.

Zdefiniowaliśmy już nasz potok do obsługi trenowania GPU. Wszystko, co musimy zrobić, to ustawienie use_gpu flagę True. Następnie rurociąg zostanie utworzony ze spec maszynowego w tym jeden NVIDIA_TESLA_K80 i nasz model kod szkoleniowy użyje tf.distribute.MirroredStrategy .

Zauważ, że use_gpu flaga nie jest częścią Vertex lub TFX API. Służy tylko do kontrolowania kodu szkoleniowego w tym samouczku.

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Teraz możesz odwiedzić 'Vertex AI> rurociągu w Google Cloud Console , aby zobaczyć postępy.

Sprzątanie

W tym samouczku utworzyłeś model Vertex AI i punkt końcowy. Proszę usunąć te zasoby, aby uniknąć niepotrzebnych opłat, przechodząc do punktów końcowych i sprężenie model z końcowego pierwszy. Następnie możesz osobno usunąć punkt końcowy i model.