Lettura dei dati da BigQuery con TFX e Vertex Pipelines

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questo tutorial basato su notebook utilizzerà Google Cloud BigQuery come origine dati per addestrare un modello ML. La pipeline ML verrà costruita utilizzando TFX ed eseguita su Google Cloud Vertex Pipelines.

Questo notebook è basato sulla pipeline TFX che abbiamo creato in Simple TFX Pipeline for Vertex Pipelines Tutorial . Se non hai ancora letto quel tutorial, dovresti leggerlo prima di procedere con questo taccuino.

BigQuery è un data warehouse multi-cloud serverless, altamente scalabile ed economico progettato per l'agilità aziendale. TFX può essere utilizzato per leggere i dati di addestramento da BigQuery e per pubblicare il modello addestrato in BigQuery.

In questo tutorial utilizzeremo il componente BigQueryExampleGen che legge i dati da BigQuery alle pipeline TFX.

Questo notebook è progettato per essere eseguito su Google Colab o sui notebook AI Platform . Se non stai utilizzando uno di questi, puoi semplicemente fare clic sul pulsante "Esegui in Google Colab" sopra.

Impostare

Se hai completato il tutorial Simple TFX Pipeline for Vertex Pipelines , avrai un progetto GCP funzionante e un bucket GCS e questo è tutto ciò di cui abbiamo bisogno per questo tutorial. Si prega di leggere prima il tutorial preliminare se te lo sei perso.

Installa pacchetti Python

Installeremo i pacchetti Python richiesti, inclusi TFX e KFP, per creare pipeline ML e inviare lavori a Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Hai riavviato il runtime?

Se stai utilizzando Google Colab, la prima volta che esegui la cella sopra, devi riavviare il runtime facendo clic sopra il pulsante "RIAVVIA RUNTIME" o utilizzando il menu "Runtime > Riavvia runtime...". Ciò è dovuto al modo in cui Colab carica i pacchetti.

Se non sei su Colab, puoi riavviare il runtime con la cella seguente.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Accedi a Google per questo notebook

Se stai eseguendo questo notebook su Colab, autenticati con il tuo account utente:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Se utilizzi AI Platform Notebooks , autenticati con Google Cloud prima di eseguire la sezione successiva, eseguendo

gcloud auth login

nella finestra Terminale (che puoi aprire tramite File > Nuovo nel menu). Devi farlo solo una volta per istanza notebook.

Controlla le versioni del pacchetto.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Imposta variabili

Imposteremo alcune variabili utilizzate per personalizzare le pipeline di seguito. Sono richieste le seguenti informazioni:

Immettere i valori richiesti nella cella sottostante prima di eseguirlo .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Imposta gcloud per utilizzare il tuo progetto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Per impostazione predefinita, Vertex Pipelines utilizza l'account di servizio GCE VM predefinito di formato [project-number]-compute@developer.gserviceaccount.com . È necessario concedere un'autorizzazione per utilizzare BigQuery a questo account per accedere a BigQuery nella pipeline. Aggiungeremo il ruolo "Utente BigQuery" all'account.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

Consulta la documentazione di Vertex per ulteriori informazioni sugli account di servizio e sulla configurazione IAM.

Crea una pipeline

Le pipeline TFX sono definite utilizzando le API Python come abbiamo fatto in Simple TFX Pipeline for Vertex Pipelines Tutorial . In precedenza abbiamo utilizzato CsvExampleGen che legge i dati da un file CSV. In questo tutorial utilizzeremo il componente BigQueryExampleGen che legge i dati da BigQuery.

Prepara la query BigQuery

Utilizzeremo lo stesso set di dati di Palmer Penguins . Tuttavia, lo leggeremo da una tabella BigQuery tfx-oss-public.palmer_penguins.palmer_penguins che è popolata utilizzando lo stesso file CSV.

Se utilizzi Google Colab, puoi esaminare direttamente il contenuto della tabella BigQuery.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Tutte le caratteristiche erano già normalizzate a 0~1 tranne la species che è l'etichetta. Costruiremo un modello di classificazione che predice le species di pinguini.

BigQueryExampleGen richiede una query per specificare quali dati recuperare. Poiché utilizzeremo tutti i campi di tutte le righe della tabella, la query è abbastanza semplice. Puoi anche specificare i nomi dei campi e aggiungere condizioni WHERE secondo necessità in base alla sintassi SQL standard di BigQuery .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Scrivi il codice del modello.

Utilizzeremo lo stesso codice del modello del tutorial Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copiare il file del modulo in GCS a cui è possibile accedere dai componenti della pipeline. Poiché l'addestramento del modello avviene su GCP, è necessario caricare questa definizione del modello.

In caso contrario, potresti voler creare un'immagine del contenitore che includa il file del modulo e utilizzare l'immagine per eseguire la pipeline.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Scrivi una definizione di pipeline

Definiremo una funzione per creare una pipeline TFX. Dobbiamo usare BigQueryExampleGen che accetta query come argomento. Un'altra modifica rispetto al tutorial precedente è che dobbiamo passare beam_pipeline_args che viene passato ai componenti quando vengono eseguiti. Utilizzeremo beam_pipeline_args per passare parametri aggiuntivi a BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

Eseguire la pipeline su Vertex Pipelines.

Useremo Vertex Pipelines per eseguire la pipeline come abbiamo fatto in Simple TFX Pipeline for Vertex Pipelines Tutorial .

Dobbiamo anche passare beam_pipeline_args per BigQueryExampleGen. Include configurazioni come il nome del progetto GCP e l'archiviazione temporanea per l'esecuzione di BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

Il file di definizione generato può essere inviato utilizzando il client kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Ora puoi visitare "Vertex AI > Pipelines" in Google Cloud Console per vedere i progressi.