הצג באתר TensorFlow.org | הפעל בגוגל קולאב | צפה במקור ב-GitHub | הורד מחברת | הפעל ב-Google Cloud Vertex AI Workbench |
הדרכה מבוססת מחברת זו תיצור צינור TFX פשוט ותפעיל אותו באמצעות Google Cloud Vertex Pipelines. מחברת זו מבוססת על צינור TFX שבנינו במדריך פשוט TFX Pipeline . אם אינך מכיר את TFX ועדיין לא קראת את המדריך הזה, עליך לקרוא אותו לפני שתמשיך עם המחברת הזו.
Google Cloud Vertex Pipelines עוזר לך לבצע אוטומציה, לפקח ולנהל את מערכות ה-ML שלך על ידי תזמורת זרימת העבודה שלך ב-ML בצורה ללא שרת. אתה יכול להגדיר את צינורות ה-ML שלך באמצעות Python עם TFX, ולאחר מכן לבצע את הצינורות שלך ב-Google Cloud. עיין במבוא של Vertex Pipelines כדי ללמוד עוד על Vertex Pipelines.
מחברת זו מיועדת להפעלה ב- Google Colab או ב- AI Platform Notebooks . אם אינך משתמש באחד מאלה, תוכל פשוט ללחוץ על כפתור "הפעל ב-Google Colab" למעלה.
להכין
לפני שתפעיל מחברת זו, ודא שיש לך את הדברים הבאים:
- פרויקט Google Cloud Platform .
- דלי של Google Cloud Storage . עיין במדריך ליצירת דליים .
- הפעל את Vertex AI ו-Cloud Storage API .
אנא עיין בתיעוד של Vertex כדי להגדיר את פרויקט ה-GCP שלך בהמשך.
התקן חבילות python
נתקין חבילות Python הנדרשות כולל TFX ו-KFP כדי ליצור צינורות ML ולהגיש עבודות ל-Vertex Pipelines.
# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"
הפעלת מחדש את זמן הריצה?
אם אתה משתמש ב-Google Colab, בפעם הראשונה שאתה מפעיל את התא שלמעלה, עליך להפעיל מחדש את זמן הריצה על ידי לחיצה מעל לחצן "התחל ריצה מחדש" או שימוש בתפריט "זמן ריצה > הפעל מחדש זמן ריצה...". זה בגלל האופן שבו קולאב טוען חבילות.
אם אינך ב-Colab, תוכל להפעיל מחדש את זמן הריצה עם התא הבא.
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
היכנס ל-Google עבור מחברת זו
אם אתה מפעיל מחברת זו ב-Colab, בצע אימות עם חשבון המשתמש שלך:
import sys
if 'google.colab' in sys.modules:
from google.colab import auth
auth.authenticate_user()
אם אתה משתמש ב-AI Platform Notebooks , בצע אימות עם Google Cloud לפני הפעלת הסעיף הבא, על ידי הפעלת
gcloud auth login
בחלון המסוף (שאותו ניתן לפתוח דרך קובץ > חדש בתפריט). אתה צריך לעשות זאת רק פעם אחת בכל מופע של מחברת.
בדוק את גרסאות החבילה.
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1 TFX version: 1.6.0 KFP version: 1.8.11
הגדר משתנים
אנו נגדיר כמה משתנים המשמשים להתאמה אישית של הצינורות להלן. יש צורך במידע הבא:
- מזהה פרויקט GCP. ראה זיהוי מזהה הפרויקט שלך .
- אזור GCP להפעלת צינורות. למידע נוסף על האזורים שבהם Vertex Pipelines זמין, עיין במדריך מיקומי Vertex AI .
- Google Cloud Storage Bucket לאחסון פלטי צינור.
הזן את הערכים הדרושים בתא למטה לפני הפעלתו .
GOOGLE_CLOUD_PROJECT = '' # <--- ENTER THIS
GOOGLE_CLOUD_REGION = '' # <--- ENTER THIS
GCS_BUCKET_NAME = '' # <--- ENTER THIS
if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
from absl import logging
logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.
הגדר את gcloud
לשימוש בפרויקט שלך.
gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified. Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags] optional flags may be --help | --installation For detailed information on this command and its flags, run: gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'
# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines
הכן נתונים לדוגמה
אנו נשתמש באותו מערך נתונים של Palmer Penguins כמו Simple TFX Pipeline Tutorial .
ישנן ארבע תכונות מספריות במערך נתונים זה שכבר נורמלו לטווח [0,1]. נבנה מודל סיווג אשר חוזה את species
הפינגווינים.
אנחנו צריכים ליצור עותק משלנו של מערך הנתונים. מכיוון ש-TFX ExampleGen קורא קלט מספריה, עלינו ליצור ספרייה ולהעתיק אליה מערך נתונים ב-GCS.
gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".
עיין במהירות בקובץ ה-CSV.
gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".
צור צינור
צינורות TFX מוגדרים באמצעות ממשקי API של Python. נגדיר צינור שמורכב משלושה רכיבים, CsvExampleGen, Trainer ו-Pusher. הגדרת הצינור והמודל זהה כמעט ל- Simple TFX Pipeline Tutorial .
ההבדל היחיד הוא שאנחנו לא צריכים להגדיר metadata_connection_config
המשמש לאיתור מסד נתונים של ML Metadata . מכיוון ש-Vertex Pipelines משתמש בשירות מטא נתונים מנוהל, המשתמשים לא צריכים לטפל בו, ואנחנו לא צריכים לציין את הפרמטר.
לפני הגדרת הצינור בפועל, עלינו לכתוב תחילה קוד דגם עבור רכיב ה-Trainer.
כתוב קוד דגם.
נשתמש באותו קוד דגם כמו במדריך Simple TFX Pipeline .
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _make_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _make_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
העתק את קובץ המודול ל-GCS שניתן לגשת אליו ממרכיבי הצינור. מכיוון שאימון מודלים מתרחש ב-GCP, עלינו להעלות את הגדרת המודל הזו.
אחרת, ייתכן שתרצה לבנות תמונת מיכל הכוללת את קובץ המודול ולהשתמש בתמונה כדי להפעיל את הצינור.
gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".
כתוב הגדרת צינור
נגדיר פונקציה ליצירת צינור TFX.
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
) -> tfx.dsl.Pipeline:
"""Creates a three component penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components)
הפעל את הצינור ב-Vertex Pipelines.
השתמשנו LocalDagRunner
שפועל על סביבה מקומית במדריך Simple TFX Pipeline . TFX מספק מתזמרים מרובים להפעיל את הצינור שלך. במדריך זה נשתמש ב-Vertex Pipelines יחד עם ה-Dag Runner Kubeflow V2.
אנחנו צריכים להגדיר רץ כדי להפעיל את הצינור בפועל. אתה תרכיב את הצינור שלך לפורמט הגדרת הצינור שלנו באמצעות ממשקי API של TFX.
import os
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
serving_model_dir=SERVING_MODEL_DIR))
ניתן להגיש את קובץ ההגדרות שנוצר באמצעות לקוח kfp.
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)
job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
display_name=PIPELINE_NAME)
job.run(sync=False)
עכשיו אתה יכול לבקר ב'Vertex AI > Pipelines' ב- Google Cloud Console כדי לראות את ההתקדמות.