ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค | ทำงานใน Google Cloud Vertex AI Workbench |
บทช่วยสอนที่ใช้สมุดบันทึกนี้จะสร้างไปป์ไลน์ TFX อย่างง่าย และเรียกใช้โดยใช้ไปป์ไลน์ Google Cloud Vertex สมุดบันทึกนี้อิงตามไปป์ไลน์ TFX ที่เราสร้างขึ้นใน Simple TFX Pipeline Tutorial หากคุณไม่คุ้นเคยกับ TFX และยังไม่ได้อ่านบทช่วยสอนนั้น คุณควรอ่านก่อนที่จะดำเนินการกับสมุดบันทึกนี้
Google Cloud Vertex Pipelines ช่วยให้คุณสร้างระบบอัตโนมัติ ตรวจสอบ และควบคุมระบบ ML ของคุณโดยจัดการเวิร์กโฟลว์ ML ของคุณแบบไร้เซิร์ฟเวอร์ คุณสามารถกำหนดไปป์ไลน์ ML ของคุณโดยใช้ Python กับ TFX จากนั้นดำเนินการไปป์ไลน์ของคุณบน Google Cloud ดู การแนะนำ Vertex Pipelines เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับ Vertex Pipelines
สมุดบันทึกนี้จัดทำขึ้นเพื่อใช้งานบน Google Colab หรือบน AI Platform Notebooks หากคุณไม่ได้ใช้อย่างใดอย่างหนึ่งเหล่านี้ คุณสามารถคลิกปุ่ม "เรียกใช้ใน Google Colab" ด้านบน
ติดตั้ง
ก่อนที่คุณจะเรียกใช้สมุดบันทึกนี้ ตรวจสอบให้แน่ใจว่าคุณได้ปฏิบัติตาม:
- โครงการ Google Cloud Platform
- ที่เก็บข้อมูล Google Cloud Storage ดู คำแนะนำสำหรับการสร้างที่เก็บข้อมูล
- เปิดใช้งาน Vertex AI และ Cloud Storage API
โปรดดู เอกสาร Vertex เพื่อกำหนดค่าโครงการ GCP ของคุณเพิ่มเติม
ติดตั้งแพ็คเกจหลาม
เราจะติดตั้งแพ็คเกจ Python ที่จำเป็น รวมถึง TFX และ KFP เพื่อสร้างไพพ์ไลน์ ML และส่งงานไปยัง Vertex Pipelines
# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"
คุณรีสตาร์ทรันไทม์หรือไม่
หากคุณกำลังใช้ Google Colab ในครั้งแรกที่คุณเรียกใช้เซลล์ด้านบน คุณต้องรีสตาร์ทรันไทม์โดยคลิกที่ปุ่ม "RESTART RUNTIME" ด้านบน หรือใช้เมนู "Runtime > Restart runtime ..." นี่เป็นเพราะวิธีที่ Colab โหลดแพ็คเกจ
หากคุณไม่ได้ใช้งาน Colab คุณสามารถรีสตาร์ทรันไทม์ได้ด้วยเซลล์ต่อไปนี้
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
ลงชื่อเข้าใช้ Google สำหรับโน้ตบุ๊กนี้
หากคุณกำลังใช้งานสมุดบันทึกนี้บน Colab ให้ตรวจสอบสิทธิ์ด้วยบัญชีผู้ใช้ของคุณ:
import sys
if 'google.colab' in sys.modules:
from google.colab import auth
auth.authenticate_user()
หากคุณใช้ AI Platform Notebooks ให้ตรวจสอบสิทธิ์กับ Google Cloud ก่อนเรียกใช้ส่วนถัดไปโดยเรียกใช้
gcloud auth login
ในหน้าต่าง Terminal (ซึ่งคุณสามารถเปิดได้ทาง File > New ในเมนู) คุณต้องทำเช่นนี้เพียงครั้งเดียวต่ออินสแตนซ์โน้ตบุ๊ก
ตรวจสอบเวอร์ชั่นของแพ็คเกจ
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1 TFX version: 1.6.0 KFP version: 1.8.11
ตั้งค่าตัวแปร
เราจะตั้งค่าตัวแปรบางตัวที่ใช้ในการปรับแต่งไปป์ไลน์ด้านล่าง ต้องการข้อมูลต่อไปนี้:
- รหัสโปรเจ็กต์ GCP ดู การระบุรหัสโปรเจ็ กต์ของคุณ
- ภูมิภาค GCP เพื่อเรียกใช้ไปป์ไลน์ สำหรับข้อมูลเพิ่มเติมเกี่ยวกับภูมิภาคต่างๆ ที่ Vertex Pipelines มีให้บริการ โปรดดู ที่ คู่มือสถานที่ตั้ง Vertex AI
- Google Cloud Storage Bucket เพื่อจัดเก็บเอาต์พุตไปป์ไลน์
ป้อนค่าที่จำเป็นในเซลล์ด้านล่างก่อนเรียกใช้
GOOGLE_CLOUD_PROJECT = '' # <--- ENTER THIS
GOOGLE_CLOUD_REGION = '' # <--- ENTER THIS
GCS_BUCKET_NAME = '' # <--- ENTER THIS
if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
from absl import logging
logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.
ตั้งค่า gcloud
เพื่อใช้โปรเจ็กต์ของคุณ
gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified. Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags] optional flags may be --help | --installation For detailed information on this command and its flags, run: gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-pipelines'
# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
GCS_BUCKET_NAME, PIPELINE_NAME)
print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-pipelines
เตรียมข้อมูลตัวอย่าง
เราจะใช้ ชุดข้อมูล Palmer Penguins เดียวกันกับ Simple TFX Pipeline Tutorial
มีคุณลักษณะที่เป็นตัวเลขสี่ชุดในชุดข้อมูลนี้ซึ่งถูกทำให้เป็นมาตรฐานแล้วให้มีช่วง [0,1] เราจะสร้างแบบจำลองการจำแนกประเภทที่ทำนาย species
ของนกเพนกวิน
เราจำเป็นต้องทำสำเนาชุดข้อมูลของเราเอง เนื่องจาก TFX ExampleGen อ่านอินพุตจากไดเร็กทอรี เราจึงต้องสร้างไดเร็กทอรีและคัดลอกชุดข้อมูลไปที่ GCS
gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/".
ดูไฟล์ CSV อย่างรวดเร็ว
gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-pipelines/penguins_processed.csv".
สร้างไปป์ไลน์
ไปป์ไลน์ TFX ถูกกำหนดโดยใช้ Python API เราจะกำหนดไปป์ไลน์ซึ่งประกอบด้วยสามองค์ประกอบ CsvExampleGen, Trainer และ Pusher คำจำกัดความของไปป์ไลน์และโมเดลเกือบจะเหมือนกับ Simple TFX Pipeline Tutorial
ข้อแตกต่างเพียงอย่างเดียวคือเราไม่จำเป็นต้องตั้งค่า metadata_connection_config
ซึ่งใช้เพื่อค้นหาฐานข้อมูล เมตาดาต้า ML เนื่องจาก Vertex Pipelines ใช้บริการข้อมูลเมตาที่มีการจัดการ ผู้ใช้จึงไม่จำเป็นต้องดูแล และเราไม่จำเป็นต้องระบุพารามิเตอร์
ก่อนกำหนดไปป์ไลน์ เราต้องเขียนโค้ดโมเดลสำหรับคอมโพเนนต์ Trainer ก่อน
เขียนรหัสรุ่น
เราจะใช้รหัสรุ่นเดียวกันกับใน Simple TFX Pipeline Tutorial
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _make_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _make_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
คัดลอกไฟล์โมดูลไปยัง GCS ซึ่งสามารถเข้าถึงได้จากส่วนประกอบไปป์ไลน์ เนื่องจากการฝึกโมเดลเกิดขึ้นใน GCP เราจึงต้องอัปโหลดคำจำกัดความของโมเดลนี้
มิฉะนั้น คุณอาจต้องการสร้างอิมเมจคอนเทนเนอร์รวมถึงไฟล์โมดูล และใช้อิมเมจเพื่อรันไปป์ไลน์
gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-pipelines/".
เขียนนิยามไปป์ไลน์
เราจะกำหนดฟังก์ชันเพื่อสร้างไปป์ไลน์ TFX
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
) -> tfx.dsl.Pipeline:
"""Creates a three component penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components)
เรียกใช้ไปป์ไลน์บน Vertex Pipelines
เราใช้ LocalDagRunner
ซึ่งทำงานบนสภาพแวดล้อมท้องถิ่นใน Simple TFX Pipeline Tutorial TFX มี orchestrator หลายตัวเพื่อดำเนินการไปป์ไลน์ของคุณ ในบทช่วยสอนนี้ เราจะใช้ Vertex Pipelines ร่วมกับ Kubeflow V2 dag runner
เราจำเป็นต้องกำหนดนักวิ่งเพื่อเรียกใช้ไปป์ไลน์จริงๆ คุณจะคอมไพล์ไปป์ไลน์ของคุณเป็นรูปแบบคำจำกัดความไปป์ไลน์ของเราโดยใช้ TFX API
import os
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
serving_model_dir=SERVING_MODEL_DIR))
ไฟล์คำจำกัดความที่สร้างขึ้นสามารถส่งได้โดยใช้ไคลเอนต์ kfp
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)
job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
display_name=PIPELINE_NAME)
job.run(sync=False)
ตอนนี้คุณสามารถไปที่ 'Vertex AI > Pipelines' ใน Google Cloud Console เพื่อดูความคืบหน้า