Voir sur TensorFlow.org | Exécuter dans Google Colab | Voir la source sur GitHub | Télécharger le cahier | Exécuter dans Google Cloud Vertex AI Workbench |
Ce didacticiel basé sur un bloc-notes utilisera Google Cloud BigQuery comme source de données pour former un modèle ML. Le pipeline ML sera construit à l'aide de TFX et exécuté sur Google Cloud Vertex Pipelines.
Ce bloc-notes est basé sur le pipeline TFX que nous avons construit dans le tutoriel Simple TFX Pipeline for Vertex Pipelines . Si vous n'avez pas encore lu ce didacticiel, vous devriez le lire avant de continuer avec ce bloc-notes.
BigQuery est un entrepôt de données multicloud sans serveur, hautement évolutif et économique, conçu pour l'agilité de l'entreprise. TFX peut être utilisé pour lire les données d'entraînement à partir de BigQuery et pour publier le modèle entraîné dans BigQuery.
Dans ce didacticiel, nous utiliserons le composant BigQueryExampleGen
qui lit les données de BigQuery vers les pipelines TFX.
Ce bloc-notes est destiné à être exécuté sur Google Colab ou sur AI Platform Notebooks . Si vous n'en utilisez pas, vous pouvez simplement cliquer sur le bouton "Exécuter dans Google Colab" ci-dessus.
Mettre en place
Si vous avez terminé le didacticiel Simple TFX Pipeline for Vertex Pipelines , vous disposerez d'un projet GCP fonctionnel et d'un compartiment GCS. C'est tout ce dont nous avons besoin pour ce didacticiel. Veuillez d'abord lire le didacticiel préliminaire si vous l'avez manqué.
Installer les packages Python
Nous installerons les packages Python requis, y compris TFX et KFP, pour créer des pipelines ML et soumettre des travaux à Vertex Pipelines.
# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"
As-tu redémarré le runtime ?
Si vous utilisez Google Colab, la première fois que vous exécutez la cellule ci-dessus, vous devez redémarrer le runtime en cliquant au-dessus du bouton "REDÉMARRER LE RUNTIME" ou en utilisant le menu "Runtime > Redémarrer le runtime...". Cela est dû à la façon dont Colab charge les packages.
Si vous n'êtes pas sur Colab, vous pouvez redémarrer l'exécution avec la cellule suivante.
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
Connectez-vous à Google pour ce bloc-notes
Si vous exécutez ce notebook sur Colab, authentifiez-vous avec votre compte utilisateur :
import sys
if 'google.colab' in sys.modules:
from google.colab import auth
auth.authenticate_user()
Si vous êtes sur AI Platform Notebooks , authentifiez-vous auprès de Google Cloud avant d'exécuter la section suivante, en exécutant
gcloud auth login
dans la fenêtre Terminal (que vous pouvez ouvrir via Fichier > Nouveau dans le menu). Vous n'avez besoin de le faire qu'une seule fois par instance de bloc-notes.
Vérifiez les versions des packages.
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1 TFX version: 1.6.0 KFP version: 1.8.11
Configurer des variables
Nous allons configurer quelques variables utilisées pour personnaliser les pipelines ci-dessous. Les informations suivantes sont requises :
- ID et numéro du projet GCP. Voir Identification de l'ID et du numéro de votre projet .
- Région GCP pour exécuter des pipelines. Pour plus d'informations sur les régions dans lesquelles Vertex Pipelines est disponible, consultez le guide des emplacements Vertex AI .
- Google Cloud Storage Bucket pour stocker les sorties de pipeline.
Entrez les valeurs requises dans la cellule ci-dessous avant de l'exécuter .
GOOGLE_CLOUD_PROJECT = '' # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = '' # <--- ENTER THIS
GOOGLE_CLOUD_REGION = '' # <--- ENTER THIS
GCS_BUCKET_NAME = '' # <--- ENTER THIS
if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
from absl import logging
logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.
Configurez gcloud
pour utiliser votre projet.
gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified. Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags] optional flags may be --help | --installation For detailed information on this command and its flags, run: gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'
# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery
Par défaut, Vertex Pipelines utilise le compte de service GCE VM par défaut au format [project-number]-compute@developer.gserviceaccount.com
. Nous devons autoriser ce compte à utiliser BigQuery pour accéder à BigQuery dans le pipeline. Nous ajouterons le rôle "Utilisateur BigQuery" au compte.
!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
--member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
--role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified. Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags] optional flags may be --condition | --condition-from-file | --help For detailed information on this command and its flags, run: gcloud projects add-iam-policy-binding --help
Veuillez consulter la documentation Vertex pour en savoir plus sur les comptes de service et la configuration IAM.
Créer un pipeline
Les pipelines TFX sont définis à l'aide d'API Python, comme nous l'avons fait dans le tutoriel Simple TFX Pipeline for Vertex Pipelines . Nous utilisions auparavant CsvExampleGen
qui lit les données d'un fichier CSV. Dans ce didacticiel, nous utiliserons le composant BigQueryExampleGen
qui lit les données de BigQuery.
Préparer la requête BigQuery
Nous utiliserons le même jeu de données Palmer Penguins . Cependant, nous le lirons à partir d'une table tfx-oss-public.palmer_penguins.palmer_penguins
qui est renseignée à l'aide du même fichier CSV.
Si vous utilisez Google Colab, vous pouvez examiner directement le contenu de la table BigQuery.
# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5
Toutes les caractéristiques ont déjà été normalisées à 0 ~ 1, à l'exception de l' species
qui est l'étiquette. Nous allons construire un modèle de classification qui prédit les species
de manchots.
BigQueryExampleGen
nécessite une requête pour spécifier les données à récupérer. Comme nous allons utiliser tous les champs de toutes les lignes de la table, la requête est assez simple. Vous pouvez également spécifier des noms de champs et ajouter des conditions WHERE
selon vos besoins, conformément à la syntaxe BigQuery Standard SQL .
QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"
Écrivez le code du modèle.
Nous utiliserons le même code de modèle que dans le didacticiel sur le pipeline TFX simple .
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _make_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _make_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
Copiez le fichier du module dans GCS, accessible à partir des composants du pipeline. Étant donné que l'entraînement du modèle a lieu sur GCP, nous devons importer cette définition de modèle.
Sinon, vous souhaiterez peut-être créer une image de conteneur incluant le fichier de module et utiliser l'image pour exécuter le pipeline.
gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".
Écrire une définition de pipeline
Nous allons définir une fonction pour créer un pipeline TFX. Nous devons utiliser BigQueryExampleGen
qui prend query
en argument. Un autre changement par rapport au didacticiel précédent est que nous devons transmettre beam_pipeline_args
qui est transmis aux composants lorsqu'ils sont exécutés. Nous utiliserons beam_pipeline_args
pour transmettre des paramètres supplémentaires à BigQuery.
from typing import List, Optional
def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
module_file: str, serving_model_dir: str,
beam_pipeline_args: Optional[List[str]],
) -> tfx.dsl.Pipeline:
"""Creates a TFX pipeline using BigQuery."""
# NEW: Query data in BigQuery as a data source.
example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
query=query)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a file destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
# NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
beam_pipeline_args=beam_pipeline_args)
Exécutez le pipeline sur Vertex Pipelines.
Nous utiliserons Vertex Pipelines pour exécuter le pipeline comme nous l'avons fait dans Simple TFX Pipeline for Vertex Pipelines Tutorial .
Nous devons également transmettre beam_pipeline_args
pour BigQueryExampleGen. Il inclut des configurations telles que le nom du projet GCP et le stockage temporaire pour l'exécution de BigQuery.
import os
# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
'--project=' + GOOGLE_CLOUD_PROJECT,
'--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
]
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
query=QUERY,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
serving_model_dir=SERVING_MODEL_DIR,
beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))
Le fichier de définition généré peut être soumis à l'aide du client kfp.
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)
job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
display_name=PIPELINE_NAME)
job.run(sync=False)
Vous pouvez désormais accéder à "Vertex AI > Pipelines" dans Google Cloud Console pour voir la progression.