Pipeline TFX Simples para Pipelines Vertex

Este tutorial baseado em notebook criará um pipeline TFX simples e o executará usando o Google Cloud Vertex Pipelines. Este notebook é baseado no pipeline TFX que criamos no Tutorial Simple TFX Pipeline . Se você não estiver familiarizado com o TFX e ainda não leu esse tutorial, leia-o antes de prosseguir com este notebook.

O Google Cloud Vertex Pipelines ajuda você a automatizar, monitorar e controlar seus sistemas de ML orquestrando seu fluxo de trabalho de ML sem servidor. Você pode definir seus pipelines de ML usando Python com TFX e, em seguida, executar seus pipelines no Google Cloud. Consulte a introdução do Vertex Pipelines para saber mais sobre o Vertex Pipelines.

Este notebook deve ser executado no Google Colab ou no AI Platform Notebooks . Se você não estiver usando um desses, basta clicar no botão "Executar no Google Colab" acima.

Configurar

Antes de executar este notebook, certifique-se de ter o seguinte:

Consulte a documentação do Vertex para configurar ainda mais seu projeto do GCP.

Instalar pacotes python

Instalaremos os pacotes Python necessários, incluindo TFX e KFP, para criar pipelines de ML e enviar trabalhos para Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Você reiniciou o tempo de execução?

Se você estiver usando o Google Colab, na primeira vez que executar a célula acima, deverá reiniciar o runtime clicando no botão acima "RESTART RUNTIME" ou usando o menu "Runtime > Restart runtime...". Isso ocorre devido à maneira como o Colab carrega os pacotes.

Se você não estiver no Colab, poderá reiniciar o tempo de execução com a seguinte célula.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Faça login no Google para este notebook

Se você estiver executando este notebook no Colab, autentique-se com sua conta de usuário:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Se você estiver no AI Platform Notebooks , autentique-se no Google Cloud antes de executar a próxima seção, executando

gcloud auth login

na janela Terminal (que você pode abrir via Arquivo > Novo no menu). Você só precisa fazer isso uma vez por instância de notebook.

Verifique as versões do pacote.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Configurar variáveis

Vamos configurar algumas variáveis ​​usadas para customizar os pipelines abaixo. As seguintes informações são necessárias:

  • Código do projeto do GCP. Consulte Identificando o ID do seu projeto .
  • Região do GCP para executar pipelines. Para obter mais informações sobre as regiões em que o Vertex Pipelines está disponível, consulte o guia de locais do Vertex AI .
  • Bucket do Google Cloud Storage para armazenar as saídas do pipeline.

Insira os valores necessários na célula abaixo antes de executá-la .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Defina o gcloud para usar seu projeto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines

Preparar dados de exemplo

Usaremos o mesmo conjunto de dados do Palmer Penguins que o Tutorial Simple TFX Pipeline .

Existem quatro características numéricas neste conjunto de dados que já foram normalizadas para ter intervalo [0,1]. Construiremos um modelo de classificação que prevê as species de pinguins.

Precisamos fazer nossa própria cópia do conjunto de dados. Como o TFX ExampleGen lê entradas de um diretório, precisamos criar um diretório e copiar o conjunto de dados para ele no GCS.

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".

Dê uma olhada rápida no arquivo CSV.

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".

Criar um pipeline

Os pipelines do TFX são definidos usando APIs do Python. Vamos definir um pipeline que consiste em três componentes, CsvExampleGen, Trainer e Pusher. A definição de pipeline e modelo é quase a mesma do Tutorial Simple TFX Pipeline .

A única diferença é que não precisamos definir metadata_connection_config que é usado para localizar o banco de dados ML Metadata . Como o Vertex Pipelines usa um serviço de metadados gerenciados, os usuários não precisam se preocupar com isso e não precisamos especificar o parâmetro.

Antes de realmente definir o pipeline, precisamos primeiro escrever um código de modelo para o componente Trainer.

Escreva o código do modelo.

Usaremos o mesmo código de modelo do Tutorial Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copie o arquivo do módulo para o GCS, que pode ser acessado a partir dos componentes do pipeline. Como o treinamento de modelo acontece no GCP, precisamos fazer upload dessa definição de modelo.

Caso contrário, convém criar uma imagem de contêiner incluindo o arquivo de módulo e usar a imagem para executar o pipeline.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".

Escreva uma definição de pipeline

Vamos definir uma função para criar um pipeline TFX.

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

Execute o pipeline no Vertex Pipelines.

Usamos LocalDagRunner que roda em ambiente local no Tutorial Simple TFX Pipeline . O TFX fornece vários orquestradores para executar seu pipeline. Neste tutorial, usaremos o Vertex Pipelines junto com o dag runner Kubeflow V2.

Precisamos definir um runner para realmente executar o pipeline. Você compilará seu pipeline em nosso formato de definição de pipeline usando APIs do TFX.

import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

O arquivo de definição gerado pode ser enviado usando o cliente kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Agora você pode visitar 'Vertex AI > Pipelines' no Google Cloud Console para ver o progresso.