การอ่านข้อมูลจาก BigQuery ด้วย TFX และ Vertex Pipelines

จัดทุกอย่างให้เป็นระเบียบอยู่เสมอด้วยคอลเล็กชัน บันทึกและจัดหมวดหมู่เนื้อหาตามค่ากำหนดของคุณ

บทแนะนำที่ใช้สมุดบันทึกนี้จะใช้ Google Cloud BigQuery เป็นแหล่งข้อมูลเพื่อฝึกโมเดล ML ML ไปป์ไลน์จะถูกสร้างขึ้นโดยใช้ TFX และทำงานบน Google Cloud Vertex Pipelines

สมุดบันทึกนี้อิงตามไปป์ไลน์ TFX ที่เราสร้างใน Simple TFX Pipeline สำหรับ Vertex Pipelines Tutorial หากคุณยังไม่ได้อ่านบทช่วยสอนนั้น คุณควรอ่านก่อนดำเนินการกับสมุดบันทึกนี้

BigQuery เป็นคลังข้อมูลมัลติคลาวด์แบบไม่ต้องใช้เซิร์ฟเวอร์ ปรับขนาดได้สูง และคุ้มค่าใช้จ่าย ออกแบบมาเพื่อความคล่องตัวทางธุรกิจ สามารถใช้ TFX เพื่ออ่านข้อมูลการฝึกอบรมจาก BigQuery และ เผยแพร่แบบจำลองที่ได้รับการฝึกอบรม ไปยัง BigQuery

ในบทช่วยสอนนี้ เราจะใช้คอมโพเนนต์ BigQueryExampleGen ซึ่งอ่านข้อมูลจากไปป์ไลน์ BigQuery ถึง TFX

สมุดบันทึกนี้จัดทำขึ้นเพื่อใช้งานบน Google Colab หรือบน AI Platform Notebooks หากคุณไม่ได้ใช้อย่างใดอย่างหนึ่งเหล่านี้ คุณสามารถคลิกปุ่ม "เรียกใช้ใน Google Colab" ด้านบน

ติดตั้ง

หากคุณทำ Simple TFX Pipeline สำหรับ Vertex Pipelines Tutorial เสร็จแล้ว คุณจะมีโปรเจ็กต์ GCP ที่ใช้งานได้และบัคเก็ต GCS และนั่นคือทั้งหมดที่เราต้องการสำหรับบทช่วยสอนนี้ โปรดอ่านบทแนะนำเบื้องต้นก่อนหากคุณพลาด

ติดตั้งแพ็คเกจหลาม

เราจะติดตั้งแพ็คเกจ Python ที่จำเป็น รวมถึง TFX และ KFP เพื่อสร้างไพพ์ไลน์ ML และส่งงานไปยัง Vertex Pipelines

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

คุณรีสตาร์ทรันไทม์หรือไม่

หากคุณกำลังใช้ Google Colab ในครั้งแรกที่คุณเรียกใช้เซลล์ด้านบน คุณต้องรีสตาร์ทรันไทม์โดยคลิกที่ปุ่ม "RESTART RUNTIME" ด้านบน หรือใช้เมนู "Runtime > Restart runtime ..." นี่เป็นเพราะวิธีที่ Colab โหลดแพ็คเกจ

หากคุณไม่ได้ใช้งาน Colab คุณสามารถรีสตาร์ทรันไทม์ได้ด้วยเซลล์ต่อไปนี้

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

ลงชื่อเข้าใช้ Google สำหรับโน้ตบุ๊กนี้

หากคุณกำลังใช้งานสมุดบันทึกนี้บน Colab ให้ตรวจสอบสิทธิ์ด้วยบัญชีผู้ใช้ของคุณ:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

หากคุณใช้ AI Platform Notebooks ให้ตรวจสอบสิทธิ์กับ Google Cloud ก่อนเรียกใช้ส่วนถัดไปโดยเรียกใช้

gcloud auth login

ในหน้าต่าง Terminal (ซึ่งคุณสามารถเปิดได้ทาง File > New ในเมนู) คุณต้องทำเช่นนี้เพียงครั้งเดียวต่ออินสแตนซ์โน้ตบุ๊ก

ตรวจสอบเวอร์ชั่นของแพ็คเกจ

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

ตั้งค่าตัวแปร

เราจะตั้งค่าตัวแปรบางตัวที่ใช้ในการปรับแต่งไปป์ไลน์ด้านล่าง ต้องการข้อมูลต่อไปนี้:

ป้อนค่าที่จำเป็นในเซลล์ด้านล่างก่อนเรียกใช้

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

ตั้งค่า gcloud เพื่อใช้โปรเจ็กต์ของคุณ

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

โดยค่าเริ่มต้น Vertex Pipelines จะใช้บัญชีบริการ GCE VM เริ่มต้นของรูปแบบ [project-number]-compute@developer.gserviceaccount.com เราจำเป็นต้องให้สิทธิ์ในการใช้ BigQuery กับบัญชีนี้เพื่อเข้าถึง BigQuery ในไปป์ไลน์ เราจะเพิ่มบทบาท 'ผู้ใช้ BigQuery' ให้กับบัญชี

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

โปรดดู เอกสาร Vertex เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับบัญชีบริการและการกำหนดค่า IAM

สร้างไปป์ไลน์

ไปป์ไลน์ TFX ถูกกำหนดโดยใช้ Python API ตามที่เราทำใน Simple TFX Pipeline สำหรับ Vertex Pipelines Tutorial ก่อนหน้านี้เราใช้ CsvExampleGen ซึ่งอ่านข้อมูลจากไฟล์ CSV ในบทช่วยสอนนี้ เราจะใช้คอมโพเนนต์ BigQueryExampleGen ซึ่งอ่านข้อมูลจาก BigQuery

เตรียมการสืบค้นข้อมูล BigQuery

เราจะใช้ ชุดข้อมูล Palmer Penguins เดียวกัน อย่างไรก็ตาม เราจะอ่านจากตาราง BigQuery tfx-oss-public.palmer_penguins.palmer_penguins ซึ่งสร้างโดยใช้ไฟล์ CSV เดียวกัน

หากคุณกำลังใช้ Google Colab คุณสามารถตรวจสอบเนื้อหาของตาราง BigQuery ได้โดยตรง

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

คุณลักษณะทั้งหมดถูกทำให้เป็นมาตรฐานแล้วเป็น 0 ~ 1 ยกเว้นสปี species ส์ซึ่งเป็นฉลาก เราจะสร้างแบบจำลองการจำแนกประเภทที่ทำนาย species ของนกเพนกวิน

BigQueryExampleGen ต้องการคำค้นหาเพื่อระบุว่าจะดึงข้อมูลใด เนื่องจากเราจะใช้เขตข้อมูลทั้งหมดของแถวทั้งหมดในตาราง แบบสอบถามจึงค่อนข้างง่าย คุณยังระบุชื่อช่องและเพิ่มเงื่อนไข WHERE ได้ตามต้องการตาม ไวยากรณ์ BigQuery Standard SQL

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

เขียนรหัสรุ่น

เราจะใช้รหัสรุ่นเดียวกันกับใน Simple TFX Pipeline Tutorial

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

คัดลอกไฟล์โมดูลไปยัง GCS ซึ่งสามารถเข้าถึงได้จากส่วนประกอบไปป์ไลน์ เนื่องจากการฝึกโมเดลเกิดขึ้นใน GCP เราจึงต้องอัปโหลดคำจำกัดความของโมเดลนี้

มิฉะนั้น คุณอาจต้องการสร้างอิมเมจคอนเทนเนอร์รวมถึงไฟล์โมดูล และใช้อิมเมจเพื่อรันไปป์ไลน์

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

เขียนนิยามไปป์ไลน์

เราจะกำหนดฟังก์ชันเพื่อสร้างไปป์ไลน์ TFX เราจำเป็นต้องใช้ BigQueryExampleGen ซึ่งรับ query เป็นอาร์กิวเมนต์ อีกหนึ่งการเปลี่ยนแปลงจากบทช่วยสอนก่อนหน้านี้คือ เราจำเป็นต้องส่ง beam_pipeline_args ซึ่งส่งผ่านไปยังส่วนประกอบเมื่อดำเนินการ เราจะใช้ beam_pipeline_args เพื่อส่งพารามิเตอร์เพิ่มเติมไปยัง BigQuery

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

เรียกใช้ไปป์ไลน์บน Vertex Pipelines

เราจะใช้ Vertex Pipelines เพื่อเรียกใช้ไปป์ไลน์ตามที่เราทำใน Simple TFX Pipeline สำหรับ Vertex Pipelines Tutorial

นอกจากนี้เรายังต้องส่ง beam_pipeline_args สำหรับ BigQueryExampleGen รวมถึงการกำหนดค่าต่างๆ เช่น ชื่อของโปรเจ็กต์ GCP และพื้นที่เก็บข้อมูลชั่วคราวสำหรับการดำเนินการ BigQuery

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

ไฟล์คำจำกัดความที่สร้างขึ้นสามารถส่งได้โดยใช้ไคลเอนต์ kfp

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

ตอนนี้คุณสามารถไปที่ 'Vertex AI > Pipelines' ใน Google Cloud Console เพื่อดูความคืบหน้า