Treten Sie der SIG TFX-Addons-Community bei und helfen Sie mit, TFX noch besser zu machen! SIG TFX-Addons beitreten

Vertex AI Training mit TFX und Vertex Pipelines

In diesem Notebook-basierten Tutorial wird eine TFX-Pipeline erstellt und ausgeführt, die ein ML-Modell mit dem Vertex AI Training-Dienst trainiert.

Dieses Notebook basiert auf der TFX - Pipeline wir in gebaut Einfacher TFX Pipeline für Vertex Pipelines Tutorial . Wenn Sie dieses Tutorial noch nicht gelesen haben, sollten Sie es lesen, bevor Sie mit diesem Notizbuch fortfahren.

Sie können Modelle auf Vertex AI mit AutoML trainieren oder benutzerdefiniertes Training verwenden. Beim benutzerdefinierten Training können Sie viele verschiedene Maschinentypen auswählen, um Ihre Trainingsjobs zu unterstützen, verteiltes Training zu ermöglichen, Hyperparameter-Tuning zu verwenden und mit GPUs zu beschleunigen.

In diesem Tutorial verwenden wir Vertex AI Training mit benutzerdefinierten Jobs, um ein Modell in einer TFX-Pipeline zu trainieren.

Das Notebook soll auf ausgeführt werden Google Colab oder auf AI - Plattform Notebooks . Wenn Sie keines davon verwenden, können Sie einfach oben auf die Schaltfläche "In Goolge Colab ausführen" klicken.

Installieren

Wenn Sie abgeschlossen haben Einfache TFX Pipeline für Vertex Pipelines Tutorial werden Sie ein funktionierendes GCP - Projekt und einen GCS Eimer und das ist alles , was wir für dieses Tutorial benötigen. Bitte lesen Sie zuerst das vorläufige Tutorial, wenn Sie es verpasst haben.

Python-Pakete installieren

Wir installieren erforderliche Python-Pakete, einschließlich TFX und KFP, um ML-Pipelines zu erstellen und Jobs an Vertex Pipelines zu senden.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade tfx==0.30.0 kfp==1.6.1

Hast du die Laufzeit neu gestartet?

Wenn Sie Google Colab verwenden und die obige Zelle zum ersten Mal ausführen, müssen Sie die Laufzeit neu starten, indem Sie oben auf die Schaltfläche "LAUFZEIT NEU STARTEN" klicken oder das Menü "Laufzeit > Laufzeit neu starten ..." verwenden. Dies liegt an der Art und Weise, wie Colab Pakete lädt.

Wenn Sie nicht auf Colab sind, können Sie die Laufzeit mit der folgenden Zelle neu starten.

import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Melden Sie sich für dieses Notizbuch bei Google an

Wenn Sie dieses Notizbuch auf Colab ausführen, authentifizieren Sie sich mit Ihrem Benutzerkonto:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Wenn Sie auf AI - Plattform Notebooks, Authentifizieren mit Google Cloud , bevor Sie den nächsten Abschnitt ausgeführt wird , indem Sie

gcloud auth login

im Terminal - Fenster (das Sie über Datei> Neu im Menü öffnen kann). Sie müssen dies nur einmal pro Notebook-Instanz tun.

Überprüfen Sie die Paketversionen.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 1.15.5
WARNING:absl:tfx_bsl.tfxio.tensor_to_arrow can only handle evaluated tensors (i.e. ndarays or SparseTensorValues) in TF 1.x.
WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.
TFX version: 0.30.0
KFP version: 1.6.1

Variablen einrichten

Wir werden einige Variablen einrichten, die verwendet werden, um die Pipelines unten anzupassen. Folgende Angaben sind erforderlich:

  • GCP-Projekt-ID. Siehe Identifizieren Projekt - ID .
  • GCP-Region zum Ausführen von Pipelines. Weitere Informationen zu den Regionen , dass Vertex Pipelines in verfügbar ist, finden Sie in der Vertex AI Standorten Führung .
  • Google Cloud Storage Bucket zum Speichern von Pipelineausgaben.

Geben Sie die erforderlichen Werte in der Zelle unten , bevor es ausgeführt wird .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Set gcloud Ihr Projekt zu verwenden.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

Beispieldaten vorbereiten

Wir werden das gleiche verwenden Palmer Penguins - Datensatz als einfache TFX Pipeline Tutorial .

Es gibt vier numerische Merkmale in diesem Datensatz, die bereits auf den Bereich [0,1] normalisiert wurden. Wir werden ein Klassifikationsmodell bauen , die die vorhersagt species von Pinguinen.

Wir müssen unsere eigene Kopie des Datensatzes erstellen. Da TFX ExampleGen Eingaben aus einem Verzeichnis liest, müssen wir ein Verzeichnis erstellen und den Datensatz auf GCS dorthin kopieren.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

Sehen Sie sich die CSV-Datei kurz an.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

Erstellen Sie eine Pipeline

Unsere Pipeline wird sehr ähnlich der Pipeline wir in erstellt einfache TFX Pipeline für Vertex Pipelines Tutorial . Die Pipeline besteht aus drei Komponenten, CsvExampleGen, Trainer und Pusher. Wir werden jedoch eine spezielle Trainerkomponente verwenden, die verwendet wird, um Trainingsworkloads auf Vertex AI zu verschieben.

TFX bietet einen speziellen Trainer Ausbildung Jobs Vertex KI Schulungs - Service zu unterbreiten, und alles , was wir tun müssen , ist nur mit Trainer in dem Erweiterungsmodul anstelle der Standard - Trainer Komponente zusammen mit einigen erforderlichen GCP - Parametern.

In diesem Tutorial werden wir Vertex AI Training-Jobs zunächst nur mit CPUs und dann mit einer GPU ausführen.

Modellcode schreiben.

Das Modell selbst ist fast ähnlich das Modell in einfachen TFX Pipeline Tutorial .

Wir fügen _get_distribution_strategy() Funktion , die eine schafft TensorFlow Vertriebsstrategie und es in verwendet wird run_fn MirroredStrategy zu verwenden , wenn GPU zur Verfügung.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Kopieren Sie die Moduldatei in GCS, auf die von den Pipelinekomponenten zugegriffen werden kann.

Andernfalls möchten Sie möglicherweise ein Container-Image einschließlich der Moduldatei erstellen und das Image zum Ausführen der Pipeline- und AI Platform-Trainingsjobs verwenden.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

Schreiben Sie eine Pipeline-Definition

Wir werden eine Funktion zum Erstellen einer TFX-Pipeline definieren. Es hat die gleichen drei Komponenten , wie in einfachem TFX Pipeline Tutorial , aber wir verwenden , um eine Trainer - Komponente in dem GCP - Erweiterungsmodul.

tfx.extensions.google_cloud_ai_platform.Trainer verhält sich wie ein normaler Trainer , aber es bewegt sich nur die Berechnung für das Modelltraining zu Wolke. Es startet einen benutzerdefinierten Job im Vertex AI Training-Dienst und die Trainerkomponente im Orchestrierungssystem wartet nur, bis der Vertex AI Training-Job abgeschlossen ist.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Führen Sie die Pipeline auf Vertex-Pipelines aus.

Wir verwenden Vertex Pipelines die Pipeline laufen , wie wir in tat Einfache TFX Pipeline für Vertex Pipelines Tutorial .

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

Die generierte Definitionsdatei kann mit dem kfp-Client übermittelt werden.

from kfp.v2.google import client

pipelines_client = client.AIPlatformClient(
    project_id=GOOGLE_CLOUD_PROJECT,
    region=GOOGLE_CLOUD_REGION,
)

_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE)

Jetzt können Sie den Link in der Ausgabe über besuchen oder Besuch ‚Vertex AI> Pipelines‘ in Google Cloud Console den Fortschritt zu sehen.

Führen Sie die Pipeline mit einer GPU aus

Vertex AI unterstützt das Training mit verschiedenen Maschinentypen, einschließlich der Unterstützung für GPUs. Siehe Maschinen spec Referenz der verfügbaren Optionen.

Wir haben bereits unsere Pipeline definiert, um GPU-Training zu unterstützen. Alles , was wir tun müssen , ist Einstellung use_gpu Flag auf True. Dann wird eine Pipeline mit einer Maschine Spezifikation erstellt werden , darunter ein NVIDIA_TESLA_K80 und unser Modell Trainingscode verwenden tf.distribute.MirroredStrategy .

Man beachte , dass use_gpu Flag nicht ein Teil des Scheitels oder TFX - API ist. Es wird nur verwendet, um den Trainingscode in diesem Tutorial zu steuern.

runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE)

Jetzt können Sie den Link in der Ausgabe über besuchen oder Besuch ‚Vertex AI> Pipelines‘ in Google Cloud Console den Fortschritt zu sehen.