קריאת נתונים מ-BigQuery עם TFX ו-Vertex Pipelines

קל לארגן דפים בעזרת אוספים אפשר לשמור ולסווג תוכן על סמך ההעדפות שלך.

מדריך זה מבוסס מחברת ישתמש ב- Google Cloud BigQuery כמקור נתונים כדי להכשיר מודל ML. צינור ה-ML ייבנה באמצעות TFX ויופעל על Google Cloud Vertex Pipelines.

מחברת זו מבוססת על צינור TFX שבנינו במדריך Simple TFX Pipeline for Vertex Pipelines . אם עדיין לא קראת את המדריך הזה, עליך לקרוא אותו לפני שתמשיך עם מחברת זו.

BigQuery הוא מחסן נתונים מרובה עננים ללא שרת, ניתן להרחבה וחסכוני המיועד לזריזות עסקית. ניתן להשתמש ב-TFX לקריאת נתוני אימון מ-BigQuery ולפרסום המודל המאומן ב-BigQuery.

במדריך זה, נשתמש ברכיב BigQueryExampleGen שקורא נתונים מ-BigQuery לצינורות TFX.

מחברת זו מיועדת להפעלה ב- Google Colab או ב- AI Platform Notebooks . אם אינך משתמש באחד מאלה, תוכל פשוט ללחוץ על כפתור "הפעל ב-Google Colab" למעלה.

להכין

אם השלמת Simple TFX Pipeline for Vertex Pipelines Tutorial , יהיה לך פרויקט GCP עובד ודלי GCS וזה כל מה שאנחנו צריכים עבור הדרכה זו. אנא קרא תחילה את המדריך המקדים אם פספסת אותו.

התקן חבילות python

נתקין חבילות Python הנדרשות כולל TFX ו-KFP כדי ליצור צינורות ML ולהגיש עבודות ל-Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

הפעלת מחדש את זמן הריצה?

אם אתה משתמש ב-Google Colab, בפעם הראשונה שאתה מפעיל את התא שלמעלה, עליך להפעיל מחדש את זמן הריצה על ידי לחיצה מעל לחצן "התחל ריצה מחדש" או שימוש בתפריט "זמן ריצה > הפעל מחדש זמן ריצה...". זה בגלל האופן שבו קולאב טוען חבילות.

אם אינך ב-Colab, תוכל להפעיל מחדש את זמן הריצה עם התא הבא.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

היכנס ל-Google עבור מחברת זו

אם אתה מפעיל מחברת זו ב-Colab, בצע אימות עם חשבון המשתמש שלך:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

אם אתה משתמש ב-AI Platform Notebooks , בצע אימות עם Google Cloud לפני הפעלת הסעיף הבא, על ידי הפעלת

gcloud auth login

בחלון המסוף (שאותו ניתן לפתוח דרך קובץ > חדש בתפריט). אתה צריך לעשות זאת רק פעם אחת בכל מופע של מחברת.

בדוק את גרסאות החבילה.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

הגדר משתנים

אנו נגדיר כמה משתנים המשמשים להתאמה אישית של הצינורות להלן. יש צורך במידע הבא:

הזן את הערכים הדרושים בתא למטה לפני הפעלתו .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

הגדר את gcloud לשימוש בפרויקט שלך.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

כברירת מחדל, Vertex Pipelines משתמש בחשבון שירות GCE VM המוגדר כברירת מחדל בפורמט [project-number]-compute@developer.gserviceaccount.com . אנחנו צריכים לתת הרשאה להשתמש ב-BigQuery לחשבון הזה כדי לגשת ל-BigQuery בצינור. נוסיף את תפקיד 'משתמש BigQuery' לחשבון.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

אנא עיין בתיעוד של Vertex כדי ללמוד עוד על חשבונות שירות ותצורת IAM.

צור צינור

צינורות TFX מוגדרים באמצעות ממשקי API של Python כפי שעשינו במדריך Simple TFX Pipeline for Vertex Pipelines . השתמשנו בעבר CsvExampleGen שקורא נתונים מקובץ CSV. במדריך זה, נשתמש ברכיב BigQueryExampleGen שקורא נתונים מ-BigQuery.

הכן שאילתת BigQuery

נשתמש באותו מערך נתונים של Palmer Penguins . עם זאת, אנו נקרא אותו מטבלת BigQuery tfx-oss-public.palmer_penguins.palmer_penguins אשר מאוכלסת באמצעות אותו קובץ CSV.

אם אתה משתמש ב-Google Colab, אתה יכול לבחון את התוכן של טבלת BigQuery ישירות.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

כל התכונות כבר היו מנורמלות ל-0~1 מלבד species שהוא התווית. נבנה מודל סיווג אשר חוזה את species הפינגווינים.

BigQueryExampleGen דורש שאילתה כדי לציין אילו נתונים לאחזר. מכיוון שנשתמש בכל השדות של כל השורות בטבלה, השאילתה די פשוטה. אתה יכול גם לציין שמות שדות ולהוסיף תנאי WHERE לפי הצורך לפי תחביר BigQuery Standard SQL .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

כתוב קוד דגם.

נשתמש באותו קוד דגם כמו במדריך Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

העתק את קובץ המודול ל-GCS שניתן לגשת אליו ממרכיבי הצינור. מכיוון שאימון מודלים מתרחש ב-GCP, עלינו להעלות את הגדרת המודל הזו.

אחרת, ייתכן שתרצה לבנות תמונת מיכל הכוללת את קובץ המודול ולהשתמש בתמונה כדי להפעיל את הצינור.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

כתוב הגדרת צינור

נגדיר פונקציה ליצירת צינור TFX. עלינו להשתמש ב- BigQueryExampleGen שלוקח query כארגומנט. עוד שינוי אחד מהמדריך הקודם הוא שאנחנו צריכים להעביר את beam_pipeline_args שמועבר לרכיבים כשהם מבוצעים. נשתמש ב- beam_pipeline_args כדי להעביר פרמטרים נוספים ל-BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

הפעל את הצינור ב-Vertex Pipelines.

אנו נשתמש ב-Vertex Pipelines כדי להפעיל את הצינור כפי שעשינו במדריך Simple TFX Pipeline for Vertex Pipelines .

אנחנו צריכים גם להעביר את beam_pipeline_args עבור BigQueryExampleGen. זה כולל הגדרות כמו שם פרויקט GCP והאחסון הזמני לביצוע BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

ניתן להגיש את קובץ ההגדרות שנוצר באמצעות לקוח kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

עכשיו אתה יכול לבקר ב'Vertex AI > Pipelines' ב- Google Cloud Console כדי לראות את ההתקדמות.