![]() | ![]() | ![]() | ![]() | ![]() |
Questo tutorial basato su notebook creerà una semplice pipeline TFX e la eseguirà utilizzando Google Cloud Vertex Pipelines. Questo notebook è basato sulla pipeline TFX che abbiamo creato in Simple TFX Pipeline Tutorial . Se non hai familiarità con TFX e non hai ancora letto quel tutorial, dovresti leggerlo prima di procedere con questo notebook.
Google Cloud Vertex Pipelines ti aiuta ad automatizzare, monitorare e governare i tuoi sistemi ML orchestrando il tuo flusso di lavoro ML in modo serverless. Puoi definire le tue pipeline ML utilizzando Python con TFX, quindi eseguire le tue pipeline su Google Cloud. Consulta l' introduzione di Vertex Pipelines per ulteriori informazioni sulle Vertex Pipelines.
Questo notebook è progettato per essere eseguito su Google Colab o sui notebook AI Platform . Se non stai utilizzando uno di questi, puoi semplicemente fare clic sul pulsante "Esegui in Google Colab" sopra.
Impostare
Prima di eseguire questo notebook, assicurarsi di disporre di quanto segue:
- Un progetto Google Cloud Platform .
- Un bucket di Google Cloud Storage . Consulta la guida per la creazione di bucket .
- Abilita Vertex AI e API Cloud Storage .
Consulta la documentazione di Vertex per configurare ulteriormente il tuo progetto GCP.
Installa pacchetti Python
Installeremo i pacchetti Python richiesti, inclusi TFX e KFP, per creare pipeline ML e inviare lavori a Vertex Pipelines.
# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"
Hai riavviato il runtime?
Se stai utilizzando Google Colab, la prima volta che esegui la cella sopra, devi riavviare il runtime facendo clic sopra il pulsante "RIAVVIA RUNTIME" o utilizzando il menu "Runtime > Riavvia runtime...". Ciò è dovuto al modo in cui Colab carica i pacchetti.
Se non sei su Colab, puoi riavviare il runtime con la cella seguente.
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
Accedi a Google per questo notebook
Se stai eseguendo questo notebook su Colab, autenticati con il tuo account utente:
import sys
if 'google.colab' in sys.modules:
from google.colab import auth
auth.authenticate_user()
Se utilizzi AI Platform Notebooks , autenticati con Google Cloud prima di eseguire la sezione successiva, eseguendo
gcloud auth login
nella finestra Terminale (che puoi aprire tramite File > Nuovo nel menu). Devi farlo solo una volta per istanza notebook.
Controlla le versioni del pacchetto.
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1 TFX version: 1.6.0 KFP version: 1.8.11
Imposta variabili
Imposteremo alcune variabili utilizzate per personalizzare le pipeline di seguito. Sono richieste le seguenti informazioni:
- ID progetto GCP. Vedere Identificazione dell'ID progetto .
- Regione GCP per eseguire pipeline. Per ulteriori informazioni sulle regioni in cui è disponibile Vertex Pipelines, vedere la guida alle posizioni di Vertex AI .
- Google Cloud Storage Bucket per archiviare gli output della pipeline.
Immettere i valori richiesti nella cella sottostante prima di eseguirlo .
GOOGLE_CLOUD_PROJECT = '' # <--- ENTER THIS
GOOGLE_CLOUD_REGION = '' # <--- ENTER THIS
GCS_BUCKET_NAME = '' # <--- ENTER THIS
if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
from absl import logging
logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.
Imposta gcloud
per utilizzare il tuo progetto.
gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified. Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags] optional flags may be --help | --installation For detailed information on this command and its flags, run: gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'
# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines
Preparare dati di esempio
Useremo lo stesso set di dati di Palmer Penguins come Simple TFX Pipeline Tutorial .
Ci sono quattro caratteristiche numeriche in questo set di dati che erano già normalizzate per avere un intervallo [0,1]. Costruiremo un modello di classificazione che predice le species
di pinguini.
Dobbiamo creare la nostra copia del set di dati. Poiché TFX ExampleGen legge gli input da una directory, è necessario creare una directory e copiarvi il set di dati su GCS.
gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".
Dai una rapida occhiata al file CSV.
gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".
Crea una pipeline
Le pipeline TFX sono definite utilizzando le API Python. Definiremo una pipeline composta da tre componenti, CsvExampleGen, Trainer e Pusher. La definizione della pipeline e del modello è quasi la stessa di Simple TFX Pipeline Tutorial .
L'unica differenza è che non è necessario impostare metadata_connection_config
che viene utilizzato per individuare il database di metadati ML . Poiché Vertex Pipelines utilizza un servizio di metadati gestito, gli utenti non devono occuparsene e non è necessario specificare il parametro.
Prima di definire effettivamente la pipeline, è necessario prima scrivere un codice modello per il componente Trainer.
Scrivi il codice del modello.
Utilizzeremo lo stesso codice del modello del tutorial Simple TFX Pipeline .
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _make_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _make_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
Copiare il file del modulo in GCS a cui è possibile accedere dai componenti della pipeline. Poiché l'addestramento del modello avviene su GCP, è necessario caricare questa definizione del modello.
In caso contrario, potresti voler creare un'immagine del contenitore che includa il file del modulo e utilizzare l'immagine per eseguire la pipeline.
gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".
Scrivi una definizione di pipeline
Definiremo una funzione per creare una pipeline TFX.
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
) -> tfx.dsl.Pipeline:
"""Creates a three component penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components)
Eseguire la pipeline su Vertex Pipelines.
Abbiamo usato LocalDagRunner
che funziona in ambiente locale in Simple TFX Pipeline Tutorial . TFX fornisce più agenti di orchestrazione per eseguire la pipeline. In questo tutorial useremo le Vertex Pipelines insieme al dag runner Kubeflow V2.
Dobbiamo definire un corridore per eseguire effettivamente la pipeline. Compilerai la tua pipeline nel nostro formato di definizione della pipeline utilizzando le API TFX.
import os
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
serving_model_dir=SERVING_MODEL_DIR))
Il file di definizione generato può essere inviato utilizzando il client kfp.
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)
job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
display_name=PIPELINE_NAME)
job.run(sync=False)
Ora puoi visitare "Vertex AI > Pipelines" in Google Cloud Console per vedere i progressi.