Canalización TFX simple para canalizaciones Vertex

Este tutorial basado en notebook creará una canalización TFX simple y la ejecutará con Google Cloud Vertex Pipelines. Este cuaderno se basa en la canalización de TFX que construimos en Tutorial de canalización de TFX simple . Si no está familiarizado con TFX y aún no ha leído ese tutorial, debe leerlo antes de continuar con este cuaderno.

Google Cloud Vertex Pipelines lo ayuda a automatizar, monitorear y gobernar sus sistemas de ML al organizar su flujo de trabajo de ML sin servidor. Puede definir sus canalizaciones de ML usando Python con TFX y luego ejecutar sus canalizaciones en Google Cloud. Consulte la introducción de Vertex Pipelines para obtener más información sobre Vertex Pipelines.

Este cuaderno está diseñado para ejecutarse en Google Colab o en cuadernos de AI Platform . Si no está utilizando uno de estos, simplemente puede hacer clic en el botón "Ejecutar en Google Colab" arriba.

Configurar

Antes de ejecutar este cuaderno, asegúrese de tener lo siguiente:

Consulte la documentación de Vertex para configurar más su proyecto de GCP.

Instalar paquetes de python

Instalaremos los paquetes de Python necesarios, incluidos TFX y KFP, para crear canalizaciones de aprendizaje automático y enviar trabajos a Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

¿Reiniciaste el tiempo de ejecución?

Si está utilizando Google Colab, la primera vez que ejecuta la celda anterior, debe reiniciar el tiempo de ejecución haciendo clic en el botón "REINICIAR TIEMPO DE EJECUCIÓN" o usando el menú "Tiempo de ejecución > Reiniciar tiempo de ejecución...". Esto se debe a la forma en que Colab carga los paquetes.

Si no está en Colab, puede reiniciar el tiempo de ejecución con la siguiente celda.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Iniciar sesión en Google para este cuaderno

Si está ejecutando este cuaderno en Colab, autentíquese con su cuenta de usuario:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Si usa AI Platform Notebooks , autentíquese con Google Cloud antes de ejecutar la siguiente sección, ejecutando

gcloud auth login

en la ventana Terminal (que puede abrir a través de Archivo > Nuevo en el menú). Solo necesita hacer esto una vez por instancia de notebook.

Compruebe las versiones del paquete.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Configurar variables

Configuraremos algunas variables utilizadas para personalizar las canalizaciones a continuación. Se requiere la siguiente información:

  • ID del proyecto de GCP. Consulte Identificar el ID de su proyecto .
  • Región de GCP para ejecutar canalizaciones. Para obtener más información sobre las regiones en las que está disponible Vertex Pipelines, consulte la guía de ubicaciones de Vertex AI .
  • Cubo de Google Cloud Storage para almacenar resultados de canalización.

Ingrese los valores requeridos en la celda a continuación antes de ejecutarlo .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Configura gcloud para usar tu proyecto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

Preparar datos de ejemplo

Usaremos el mismo conjunto de datos de Palmer Penguins que el Tutorial simple de tubería TFX .

Hay cuatro características numéricas en este conjunto de datos que ya se normalizaron para tener un rango [0,1]. Construiremos un modelo de clasificación que prediga las species de pingüinos.

Necesitamos hacer nuestra propia copia del conjunto de datos. Debido a que TFX ExampleGen lee las entradas de un directorio, necesitamos crear un directorio y copiarle un conjunto de datos en GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

Eche un vistazo rápido al archivo CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

Crear una canalización

Las canalizaciones de TFX se definen mediante las API de Python. Definiremos una canalización que consta de tres componentes, CsvExampleGen, Trainer y Pusher. La canalización y la definición del modelo son casi las mismas que las del Tutorial de canalización TFX simple .

La única diferencia es que no necesitamos configurar metadata_connection_config que se usa para ubicar la base de datos de metadatos de ML . Debido a que Vertex Pipelines usa un servicio de metadatos administrados, los usuarios no necesitan preocuparse por él y no necesitamos especificar el parámetro.

Antes de definir realmente la canalización, primero debemos escribir un código modelo para el componente Entrenador.

Escriba el código del modelo.

Usaremos el mismo código de modelo que en el Tutorial de canalización TFX simple .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copie el archivo del módulo en GCS, al que se puede acceder desde los componentes de la canalización. Debido a que el entrenamiento del modelo ocurre en GCP, necesitamos cargar esta definición del modelo.

De lo contrario, es posible que desee crear una imagen de contenedor que incluya el archivo del módulo y usar la imagen para ejecutar la canalización.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

Escribir una definición de canalización

Definiremos una función para crear una canalización TFX.

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Ejecute la canalización en Vertex Pipelines.

Usamos LocalDagRunner , que se ejecuta en un entorno local en el Tutorial de tubería TFX simple . TFX proporciona múltiples orquestadores para ejecutar su canalización. En este tutorial, usaremos Vertex Pipelines junto con Kubeflow V2 dag runner.

Necesitamos definir un corredor para ejecutar realmente la canalización. Compilará su canalización en nuestro formato de definición de canalización utilizando las API de TFX.

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

El archivo de definición generado se puede enviar mediante el cliente kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Ahora puede visitar 'Vertex AI > Pipelines' en Google Cloud Console para ver el progreso.