¡El Día de la Comunidad de ML es el 9 de noviembre! Únase a nosotros para recibir actualizaciones de TensorFlow, JAX, y más Más información

Leer datos de BigQuery con TFX y Vertex Pipelines

Este tutorial basado en el cuaderno utilizará la nube de Google BigQuery como fuente de datos para entrenar un modelo ML. La canalización de ML se construirá con TFX y se ejecutará en Google Cloud Vertex Pipelines.

Este portátil se basa en la tubería TFX hemos construido en simple TFX tubería por Vertex Tuberías Tutorial . Si aún no ha leído ese tutorial, debe leerlo antes de continuar con este cuaderno.

BigQuery es sin servidor, altamente escalable y almacenamiento de datos multi-nube rentable diseñado para la agilidad del negocio. TFX se puede utilizar para leer datos de entrenamiento de BigQuery y para publicar el modelo entrenado a BigQuery.

En este tutorial, vamos a utilizar el BigQueryExampleGen componente que lee datos de BigQuery para tuberías TFX.

Este portátil está diseñado para ejecutarse en Google Colab o en Cuadernos de la plataforma de IA . Si no está utilizando uno de estos, simplemente haga clic en el botón "Ejecutar en Google Colab" que se encuentra arriba.

Configurar

Si ha completado simple TFX tubería por Vertex Tuberías Tutorial , usted tiene un proyecto GCP de trabajo y un cubo de GCS y eso es todo lo que necesitamos para este tutorial. Lea primero el tutorial preliminar si se lo perdió.

Instalar paquetes de python

Instalaremos los paquetes de Python necesarios, incluidos TFX y KFP, para crear canalizaciones de ML y enviar trabajos a Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

¿Reinició el tiempo de ejecución?

Si está utilizando Google Colab, la primera vez que ejecuta la celda anterior, debe reiniciar el tiempo de ejecución haciendo clic sobre el botón "RESTART RUNTIME" o usando el menú "Runtime> Restart runtime ...". Esto se debe a la forma en que Colab carga los paquetes.

Si no está en Colab, puede reiniciar el tiempo de ejecución con la siguiente celda.

import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Inicie sesión en Google para este bloc de notas

Si está ejecutando este cuaderno en Colab, autentíquese con su cuenta de usuario:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Si se encuentra en la plataforma IA cuadernos, autenticarse con la nube de Google antes de ejecutar la siguiente sección, mediante la ejecución

gcloud auth login

en la ventana de terminal (que se puede abrir a través de Archivo> Nuevo en el menú). Solo necesita hacer esto una vez por instancia de notebook.

Verifique las versiones del paquete.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.5.1
TFX version: 1.2.0
KFP version: 1.8.1

Configurar variables

Configuraremos algunas variables que se utilizarán para personalizar las canalizaciones a continuación. Se requiere la siguiente información:

Introduzca los valores requeridos en la celda de abajo antes de ejecutarlo.

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Conjunto gcloud utilizar su proyecto.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Por defecto, el Vertex Tuberías utiliza la cuenta de servicio de la CME máquina virtual por defecto de formato [project-number]-compute@developer.gserviceaccount.com . Necesitamos otorgar permiso para usar BigQuery a esta cuenta para acceder a BigQuery en la canalización. Agregaremos la función 'Usuario de BigQuery' a la cuenta.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

Por favor, vea la documentación de Vertex para aprender más acerca de las cuentas de servicio y configuración de IAM.

Crea una canalización

Tuberías TFX se definen utilizando las API de Python como lo hicimos en simple TFX tubería por Vertex Tuberías Tutorial . Se utilizó previamente CsvExampleGen el que lee datos de un archivo CSV. En este tutorial, vamos a utilizar BigQueryExampleGen componente que lee datos de BigQuery.

Preparar la consulta de BigQuery

Vamos a utilizar el mismo conjunto de datos Palmer Pingüinos . Sin embargo, vamos a leerlo de una tabla de BigQuery tfx-oss-public.palmer_penguins.palmer_penguins que se llena mediante el mismo archivo CSV.

Si usa Google Colab, puede examinar el contenido de la tabla de BigQuery directamente.

%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Todas las características ya se normalizaron a 0 ~ 1 excepto species que es la etiqueta. Vamos a construir un modelo de clasificación que predice las species de pingüinos.

BigQueryExampleGen requiere una consulta para especificar los datos para ir a buscar. Debido a que usaremos todos los campos de todas las filas de la tabla, la consulta es bastante simple. También puede especificar nombres de campo y añadir WHERE condiciones según sea necesario de acuerdo con la sintaxis SQL estándar BigQuery .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Escriba el código del modelo.

Vamos a utilizar el mismo código de modelo como en el simple TFX Pipeline Tutorial .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copie el archivo del módulo en GCS, al que se puede acceder desde los componentes de la canalización. Debido a que el entrenamiento de modelos ocurre en GCP, necesitamos subir esta definición de modelo.

De lo contrario, es posible que desee crear una imagen de contenedor que incluya el archivo del módulo y usar la imagen para ejecutar la canalización.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Escribe una definición de canalización

Definiremos una función para crear una canalización TFX. Tenemos que utilizar BigQueryExampleGen que tiene query como un argumento. Uno de los cambios más del tutorial anterior es que necesitamos pasar beam_pipeline_args que se pasa a los componentes cuando se ejecutan. Vamos a utilizar beam_pipeline_args para pasar parámetros adicionales a BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

Ejecute la canalización en Vertex Pipelines.

Vamos a utilizar Vertex Tuberías para ejecutar la tubería como lo hicimos en simple TFX tubería por Vertex Tuberías Tutorial .

También tenemos que pasar beam_pipeline_args para la BigQueryExampleGen. Incluye configuraciones como el nombre del proyecto de GCP y el almacenamiento temporal para la ejecución de BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

El archivo de definición generado se puede enviar mediante el cliente kfp.

from kfp.v2.google import client

pipelines_client = client.AIPlatformClient(
    project_id=GOOGLE_CLOUD_PROJECT,
    region=GOOGLE_CLOUD_REGION,
)

_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE)

Ahora se puede visitar el enlace en la salida anterior o la visita de Vertex AI> Tuberías 'en Google Cloud Console para ver el progreso.