Lire les données de BigQuery avec TFX et Vertex Pipelines

Ce didacticiel basé sur un bloc-notes utilisera Google Cloud BigQuery comme source de données pour former un modèle ML. Le pipeline ML sera construit à l'aide de TFX et exécuté sur Google Cloud Vertex Pipelines.

Ce bloc-notes est basé sur le pipeline TFX que nous avons construit dans le tutoriel Simple TFX Pipeline for Vertex Pipelines . Si vous n'avez pas encore lu ce didacticiel, vous devriez le lire avant de continuer avec ce bloc-notes.

BigQuery est un entrepôt de données multicloud sans serveur, hautement évolutif et économique, conçu pour l'agilité de l'entreprise. TFX peut être utilisé pour lire les données d'entraînement à partir de BigQuery et pour publier le modèle entraîné dans BigQuery.

Dans ce didacticiel, nous utiliserons le composant BigQueryExampleGen qui lit les données de BigQuery vers les pipelines TFX.

Ce bloc-notes est destiné à être exécuté sur Google Colab ou sur AI Platform Notebooks . Si vous n'en utilisez pas, vous pouvez simplement cliquer sur le bouton "Exécuter dans Google Colab" ci-dessus.

Mettre en place

Si vous avez terminé le didacticiel Simple TFX Pipeline for Vertex Pipelines , vous disposerez d'un projet GCP fonctionnel et d'un compartiment GCS. C'est tout ce dont nous avons besoin pour ce didacticiel. Veuillez d'abord lire le didacticiel préliminaire si vous l'avez manqué.

Installer les packages Python

Nous installerons les packages Python requis, y compris TFX et KFP, pour créer des pipelines ML et soumettre des travaux à Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

As-tu redémarré le runtime ?

Si vous utilisez Google Colab, la première fois que vous exécutez la cellule ci-dessus, vous devez redémarrer le runtime en cliquant au-dessus du bouton "REDÉMARRER LE RUNTIME" ou en utilisant le menu "Runtime > Redémarrer le runtime...". Cela est dû à la façon dont Colab charge les packages.

Si vous n'êtes pas sur Colab, vous pouvez redémarrer l'exécution avec la cellule suivante.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Connectez-vous à Google pour ce bloc-notes

Si vous exécutez ce notebook sur Colab, authentifiez-vous avec votre compte utilisateur :

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

Si vous êtes sur AI Platform Notebooks , authentifiez-vous auprès de Google Cloud avant d'exécuter la section suivante, en exécutant

gcloud auth login

dans la fenêtre Terminal (que vous pouvez ouvrir via Fichier > Nouveau dans le menu). Vous n'avez besoin de le faire qu'une seule fois par instance de bloc-notes.

Vérifiez les versions des packages.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Configurer des variables

Nous allons configurer quelques variables utilisées pour personnaliser les pipelines ci-dessous. Les informations suivantes sont requises :

Entrez les valeurs requises dans la cellule ci-dessous avant de l'exécuter .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Configurez gcloud pour utiliser votre projet.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Par défaut, Vertex Pipelines utilise le compte de service GCE VM par défaut au format [project-number]-compute@developer.gserviceaccount.com . Nous devons autoriser ce compte à utiliser BigQuery pour accéder à BigQuery dans le pipeline. Nous ajouterons le rôle "Utilisateur BigQuery" au compte.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

Veuillez consulter la documentation Vertex pour en savoir plus sur les comptes de service et la configuration IAM.

Créer un pipeline

Les pipelines TFX sont définis à l'aide d'API Python, comme nous l'avons fait dans le tutoriel Simple TFX Pipeline for Vertex Pipelines . Nous utilisions auparavant CsvExampleGen qui lit les données d'un fichier CSV. Dans ce didacticiel, nous utiliserons le composant BigQueryExampleGen qui lit les données de BigQuery.

Préparer la requête BigQuery

Nous utiliserons le même jeu de données Palmer Penguins . Cependant, nous le lirons à partir d'une table tfx-oss-public.palmer_penguins.palmer_penguins qui est renseignée à l'aide du même fichier CSV.

Si vous utilisez Google Colab, vous pouvez examiner directement le contenu de la table BigQuery.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Toutes les caractéristiques ont déjà été normalisées à 0 ~ 1, à l'exception de l' species qui est l'étiquette. Nous allons construire un modèle de classification qui prédit les species de manchots.

BigQueryExampleGen nécessite une requête pour spécifier les données à récupérer. Comme nous allons utiliser tous les champs de toutes les lignes de la table, la requête est assez simple. Vous pouvez également spécifier des noms de champs et ajouter des conditions WHERE selon vos besoins, conformément à la syntaxe BigQuery Standard SQL .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Écrivez le code du modèle.

Nous utiliserons le même code de modèle que dans le didacticiel sur le pipeline TFX simple .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Copiez le fichier du module dans GCS, accessible à partir des composants du pipeline. Étant donné que l'entraînement du modèle a lieu sur GCP, nous devons importer cette définition de modèle.

Sinon, vous souhaiterez peut-être créer une image de conteneur incluant le fichier de module et utiliser l'image pour exécuter le pipeline.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Écrire une définition de pipeline

Nous allons définir une fonction pour créer un pipeline TFX. Nous devons utiliser BigQueryExampleGen qui prend query en argument. Un autre changement par rapport au didacticiel précédent est que nous devons transmettre beam_pipeline_args qui est transmis aux composants lorsqu'ils sont exécutés. Nous utiliserons beam_pipeline_args pour transmettre des paramètres supplémentaires à BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

Exécutez le pipeline sur Vertex Pipelines.

Nous utiliserons Vertex Pipelines pour exécuter le pipeline comme nous l'avons fait dans Simple TFX Pipeline for Vertex Pipelines Tutorial .

Nous devons également transmettre beam_pipeline_args pour BigQueryExampleGen. Il inclut des configurations telles que le nom du projet GCP et le stockage temporaire pour l'exécution de BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

Le fichier de définition généré peut être soumis à l'aide du client kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Vous pouvez désormais accéder à "Vertex AI > Pipelines" dans Google Cloud Console pour voir la progression.