Pipeline TFX simple pour Vertex Pipelines

Ce didacticiel basé sur un ordinateur portable créera un pipeline TFX simple et l'exécutera à l'aide de Google Cloud Vertex Pipelines. Ce bloc-notes est basé sur le pipeline TFX que nous avons construit dans Tutoriel sur le pipeline TFX simple . Si vous n'êtes pas familier avec TFX et que vous n'avez pas encore lu ce didacticiel, vous devriez le lire avant de continuer avec ce bloc-notes.

Google Cloud Vertex Pipelines vous aide à automatiser, surveiller et gouverner vos systèmes de ML en orchestrant votre flux de travail de ML sans serveur. Vous pouvez définir vos pipelines de ML à l'aide de Python avec TFX, puis exécuter vos pipelines sur Google Cloud. Voir l' introduction de Vertex Pipelines pour en savoir plus sur Vertex Pipelines.

Ce bloc-notes est destiné à être exécuté sur Google Colab ou sur AI Platform Notebooks . Si vous n'en utilisez pas, vous pouvez simplement cliquer sur le bouton "Exécuter dans Google Colab" ci-dessus.

Mettre en place

Avant d'exécuter ce notebook, assurez-vous que vous disposez des éléments suivants :

Veuillez consulter la documentation Vertex pour configurer davantage votre projet GCP.

Installer les packages Python

Nous installerons les packages Python requis, y compris TFX et KFP, pour créer des pipelines ML et soumettre des travaux à Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

As-tu redémarré le runtime ?

Si vous utilisez Google Colab, la première fois que vous exécutez la cellule ci-dessus, vous devez redémarrer le runtime en cliquant au-dessus du bouton "REDÉMARRER LE RUNTIME" ou en utilisant le menu "Runtime > Redémarrer le runtime...". Cela est dû à la façon dont Colab charge les packages.

Si vous n'êtes pas sur Colab, vous pouvez redémarrer l'exécution avec la cellule suivante.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Connectez-vous à Google pour ce bloc-notes

Si vous exécutez ce notebook sur Colab, authentifiez-vous avec votre compte utilisateur :

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Si vous êtes sur AI Platform Notebooks , authentifiez-vous auprès de Google Cloud avant d'exécuter la section suivante, en exécutant

gcloud auth login

dans la fenêtre Terminal (que vous pouvez ouvrir via Fichier > Nouveau dans le menu). Vous n'avez besoin de le faire qu'une seule fois par instance de bloc-notes.

Vérifiez les versions des packages.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Configurer des variables

Nous allons configurer quelques variables utilisées pour personnaliser les pipelines ci-dessous. Les informations suivantes sont requises :

Entrez les valeurs requises dans la cellule ci-dessous avant de l'exécuter .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Configurez gcloud pour utiliser votre projet.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

Préparer des exemples de données

Nous utiliserons le même ensemble de données Palmer Penguins que le didacticiel Simple TFX Pipeline .

Il y a quatre caractéristiques numériques dans cet ensemble de données qui ont déjà été normalisées pour avoir une plage [0,1]. Nous allons construire un modèle de classification qui prédit les species de manchots.

Nous devons faire notre propre copie de l'ensemble de données. Étant donné que TFX ExampleGen lit les entrées d'un répertoire, nous devons créer un répertoire et y copier l'ensemble de données sur GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

Jetez un coup d'œil au fichier CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

Créer un pipeline

Les pipelines TFX sont définis à l'aide d'API Python. Nous allons définir un pipeline composé de trois composants, CsvExampleGen, Trainer et Pusher. La définition du pipeline et du modèle est presque la même que celle du didacticiel sur le pipeline TFX simple .

La seule différence est que nous n'avons pas besoin de définir metadata_connection_config qui est utilisé pour localiser la base de données de métadonnées ML . Étant donné que Vertex Pipelines utilise un service de métadonnées gérées, les utilisateurs n'ont pas besoin de s'en soucier et nous n'avons pas besoin de spécifier le paramètre.

Avant de définir réellement le pipeline, nous devons d'abord écrire un code de modèle pour le composant Trainer.

Écrivez le code du modèle.

Nous utiliserons le même code de modèle que dans le didacticiel sur le pipeline TFX simple .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copiez le fichier du module dans GCS, accessible à partir des composants du pipeline. Étant donné que l'entraînement du modèle a lieu sur GCP, nous devons importer cette définition de modèle.

Sinon, vous souhaiterez peut-être créer une image de conteneur incluant le fichier de module et utiliser l'image pour exécuter le pipeline.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

Écrire une définition de pipeline

Nous allons définir une fonction pour créer un pipeline TFX.

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Exécutez le pipeline sur Vertex Pipelines.

Nous avons utilisé LocalDagRunner qui s'exécute sur l'environnement local dans Simple TFX Pipeline Tutorial . TFX fournit plusieurs orchestrateurs pour exécuter votre pipeline. Dans ce didacticiel, nous utiliserons Vertex Pipelines avec le dag runner Kubeflow V2.

Nous devons définir un runner pour exécuter réellement le pipeline. Vous compilerez votre pipeline dans notre format de définition de pipeline à l'aide des API TFX.

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

Le fichier de définition généré peut être soumis à l'aide du client kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Vous pouvez désormais accéder à "Vertex AI > Pipelines" dans Google Cloud Console pour voir la progression.