Pipeline TFX semplice per tubazioni Vertex

Questo tutorial basato su notebook creerà una semplice pipeline TFX e la eseguirà utilizzando Google Cloud Vertex Pipelines. Questo notebook è basato sulla pipeline TFX che abbiamo creato in Simple TFX Pipeline Tutorial . Se non hai familiarità con TFX e non hai ancora letto quel tutorial, dovresti leggerlo prima di procedere con questo notebook.

Google Cloud Vertex Pipelines ti aiuta ad automatizzare, monitorare e governare i tuoi sistemi ML orchestrando il tuo flusso di lavoro ML in modo serverless. Puoi definire le tue pipeline ML utilizzando Python con TFX, quindi eseguire le tue pipeline su Google Cloud. Consulta l' introduzione di Vertex Pipelines per ulteriori informazioni sulle Vertex Pipelines.

Questo notebook è progettato per essere eseguito su Google Colab o sui notebook AI Platform . Se non stai utilizzando uno di questi, puoi semplicemente fare clic sul pulsante "Esegui in Google Colab" sopra.

Impostare

Prima di eseguire questo notebook, assicurarsi di disporre di quanto segue:

Consulta la documentazione di Vertex per configurare ulteriormente il tuo progetto GCP.

Installa pacchetti Python

Installeremo i pacchetti Python richiesti, inclusi TFX e KFP, per creare pipeline ML e inviare lavori a Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Hai riavviato il runtime?

Se stai utilizzando Google Colab, la prima volta che esegui la cella sopra, devi riavviare il runtime facendo clic sopra il pulsante "RIAVVIA RUNTIME" o utilizzando il menu "Runtime > Riavvia runtime...". Ciò è dovuto al modo in cui Colab carica i pacchetti.

Se non sei su Colab, puoi riavviare il runtime con la cella seguente.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Accedi a Google per questo notebook

Se stai eseguendo questo notebook su Colab, autenticati con il tuo account utente:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Se utilizzi AI Platform Notebooks , autenticati con Google Cloud prima di eseguire la sezione successiva, eseguendo

gcloud auth login

nella finestra Terminale (che puoi aprire tramite File > Nuovo nel menu). Devi farlo solo una volta per istanza notebook.

Controlla le versioni del pacchetto.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Imposta variabili

Imposteremo alcune variabili utilizzate per personalizzare le pipeline di seguito. Sono richieste le seguenti informazioni:

Immettere i valori richiesti nella cella sottostante prima di eseguirlo .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Imposta gcloud per utilizzare il tuo progetto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

Preparare dati di esempio

Useremo lo stesso set di dati di Palmer Penguins come Simple TFX Pipeline Tutorial .

Ci sono quattro caratteristiche numeriche in questo set di dati che erano già normalizzate per avere un intervallo [0,1]. Costruiremo un modello di classificazione che predice le species di pinguini.

Dobbiamo creare la nostra copia del set di dati. Poiché TFX ExampleGen legge gli input da una directory, è necessario creare una directory e copiarvi il set di dati su GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

Dai una rapida occhiata al file CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

Crea una pipeline

Le pipeline TFX sono definite utilizzando le API Python. Definiremo una pipeline composta da tre componenti, CsvExampleGen, Trainer e Pusher. La definizione della pipeline e del modello è quasi la stessa di Simple TFX Pipeline Tutorial .

L'unica differenza è che non è necessario impostare metadata_connection_config che viene utilizzato per individuare il database di metadati ML . Poiché Vertex Pipelines utilizza un servizio di metadati gestito, gli utenti non devono occuparsene e non è necessario specificare il parametro.

Prima di definire effettivamente la pipeline, è necessario prima scrivere un codice modello per il componente Trainer.

Scrivi il codice del modello.

Utilizzeremo lo stesso codice del modello del tutorial Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copiare il file del modulo in GCS a cui è possibile accedere dai componenti della pipeline. Poiché l'addestramento del modello avviene su GCP, è necessario caricare questa definizione del modello.

In caso contrario, potresti voler creare un'immagine del contenitore che includa il file del modulo e utilizzare l'immagine per eseguire la pipeline.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

Scrivi una definizione di pipeline

Definiremo una funzione per creare una pipeline TFX.

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Eseguire la pipeline su Vertex Pipelines.

Abbiamo usato LocalDagRunner che funziona in ambiente locale in Simple TFX Pipeline Tutorial . TFX fornisce più agenti di orchestrazione per eseguire la pipeline. In questo tutorial useremo le Vertex Pipelines insieme al dag runner Kubeflow V2.

Dobbiamo definire un corridore per eseguire effettivamente la pipeline. Compilerai la tua pipeline nel nostro formato di definizione della pipeline utilizzando le API TFX.

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

Il file di definizione generato può essere inviato utilizzando il client kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Ora puoi visitare "Vertex AI > Pipelines" in Google Cloud Console per vedere i progressi.