خواندن داده ها از BigQuery با خطوط لوله TFX و Vertex

این آموزش مبتنی بر نوت بوک از Google Cloud BigQuery به عنوان منبع داده برای آموزش یک مدل ML استفاده می کند. خط لوله ML با استفاده از TFX ساخته می شود و در Google Cloud Vertex Pipelines اجرا می شود.

این نوت بوک بر اساس خط لوله TFX است که ما در آموزش Simple TFX Pipeline for Vertex Pipelines ساخته ایم. اگر هنوز آن آموزش را نخوانده اید، باید قبل از ادامه با این دفترچه آن را بخوانید.

BigQuery یک انبار داده چند ابری بدون سرور، بسیار مقیاس پذیر و مقرون به صرفه است که برای چابکی تجاری طراحی شده است. از TFX می توان برای خواندن داده های آموزشی از BigQuery و انتشار مدل آموزش دیده در BigQuery استفاده کرد.

در این آموزش از کامپوننت BigQueryExampleGen استفاده می کنیم که داده ها را از BigQuery به خطوط لوله TFX می خواند.

این نوت بوک برای اجرا در گوگل کولب یا نوت بوک های پلتفرم هوش مصنوعی در نظر گرفته شده است. اگر از یکی از این موارد استفاده نمی کنید، می توانید به سادگی روی دکمه "Run in Google Colab" در بالا کلیک کنید.

برپایی

اگر آموزش Simple TFX Pipeline for Vertex Pipelines را کامل کرده اید، یک پروژه GCP و یک سطل GCS خواهید داشت و این تمام چیزی است که ما برای این آموزش نیاز داریم. لطفاً اگر آموزش اولیه را از دست دادید، ابتدا مطالعه کنید.

بسته های پایتون را نصب کنید

ما بسته‌های Python مورد نیاز از جمله TFX و KFP را برای نگارش خطوط لوله ML نصب می‌کنیم و کارها را به Vertex Pipelines ارسال می‌کنیم.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

آیا زمان اجرا را مجدداً راه اندازی کردید؟

اگر از Google Colab استفاده می‌کنید، اولین باری که سلول بالا را اجرا می‌کنید، باید با کلیک کردن روی دکمه «راه‌اندازی مجدد زمان اجرا» یا با استفاده از منوی «زمان اجرا > زمان اجرا مجدد ...» زمان اجرا را مجدداً راه‌اندازی کنید. این به دلیل روشی است که Colab بسته ها را بارگذاری می کند.

اگر در Colab نیستید، می توانید زمان اجرا را با سلول زیر راه اندازی مجدد کنید.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

برای این نوت بوک وارد گوگل شوید

اگر این نوت بوک را در Colab اجرا می کنید، با حساب کاربری خود احراز هویت کنید:

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

اگر از نوت‌بوک‌های پلتفرم هوش مصنوعی استفاده می‌کنید، قبل از اجرای بخش بعدی، با استفاده از Google Cloud احراز هویت کنید.

gcloud auth login

در پنجره ترمینال (که می توانید از طریق File > New در منو باز کنید). شما فقط باید این کار را یک بار در هر نوت بوک انجام دهید.

نسخه های بسته را بررسی کنید.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

متغیرها را تنظیم کنید

ما برخی از متغیرهای مورد استفاده برای سفارشی کردن خطوط لوله را در زیر تنظیم خواهیم کرد. اطلاعات زیر مورد نیاز است:

قبل از اجرای آن، مقادیر مورد نیاز را در سلول زیر وارد کنید .

GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

gcloud را برای استفاده از پروژه خود تنظیم کنید.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

به طور پیش فرض Vertex Pipelines از حساب پیش فرض سرویس GCE VM با فرمت [project-number]-compute@developer.gserviceaccount.com استفاده می کند. ما باید مجوز استفاده از BigQuery را به این حساب برای دسترسی به BigQuery در خط لوله بدهیم. نقش "کاربر BigQuery" را به حساب اضافه می کنیم.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help

لطفاً برای کسب اطلاعات بیشتر در مورد حساب‌های سرویس و پیکربندی IAM به اسناد Vertex مراجعه کنید.

یک خط لوله ایجاد کنید

خطوط لوله TFX با استفاده از API های پایتون تعریف می شوند همانطور که در آموزش Simple TFX Pipeline for Vertex Pipelines انجام دادیم. ما قبلاً از CsvExampleGen استفاده می کردیم که داده ها را از یک فایل CSV می خواند. در این آموزش از کامپوننت BigQueryExampleGen استفاده می کنیم که داده ها را از BigQuery می خواند.

پرس و جو BigQuery را آماده کنید

ما از همان مجموعه داده Palmer Penguins استفاده خواهیم کرد. با این حال، ما آن را از جدول BigQuery tfx-oss-public.palmer_penguins.palmer_penguins می خوانیم که با استفاده از همان فایل CSV پر شده است.

اگر از Google Colab استفاده می کنید، می توانید محتوای جدول BigQuery را مستقیماً بررسی کنید.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

همه ویژگی‌ها قبلاً به 0 تا 1 نرمال شده بودند به جز species که برچسب هستند. ما یک مدل طبقه بندی خواهیم ساخت که species پنگوئن ها را پیش بینی می کند.

BigQueryExampleGen به یک پرس و جو نیاز دارد تا مشخص کند کدام داده باید واکشی شود. از آنجایی که ما از تمام فیلدهای تمام ردیف های جدول استفاده خواهیم کرد، پرس و جو بسیار ساده است. همچنین می‌توانید نام فیلدها را مشخص کنید و شرایط WHERE را طبق دستور BigQuery Standard SQL در صورت نیاز اضافه کنید.

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

کد مدل را بنویسید

ما از همان کد مدل استفاده خواهیم کرد که در آموزش Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

فایل ماژول را در GCS کپی کنید که از اجزای خط لوله قابل دسترسی است. از آنجایی که آموزش مدل در GCP انجام می شود، باید این تعریف مدل را آپلود کنیم.

در غیر این صورت، ممکن است بخواهید یک تصویر ظرف شامل فایل ماژول بسازید و از تصویر برای اجرای خط لوله استفاده کنید.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

یک تعریف خط لوله بنویسید

ما تابعی را برای ایجاد خط لوله TFX تعریف می کنیم. ما باید از BigQueryExampleGen استفاده کنیم که query را به عنوان آرگومان می گیرد. یک تغییر دیگر نسبت به آموزش قبلی این است که ما باید beam_pipeline_args را ارسال کنیم که در هنگام اجرا به اجزا ارسال می شود. ما از beam_pipeline_args برای ارسال پارامترهای اضافی به BigQuery استفاده خواهیم کرد.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

خط لوله را روی خطوط لوله Vertex اجرا کنید.

همانطور که در Simple TFX Pipeline for Vertex Pipelines Tutorial انجام دادیم برای اجرای خط لوله از Vertex Pipelines استفاده خواهیم کرد.

همچنین باید beam_pipeline_args را برای BigQueryExampleGen ارسال کنیم. این شامل پیکربندی هایی مانند نام پروژه GCP و ذخیره سازی موقت برای اجرای BigQuery است.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

فایل تعریف تولید شده را می توان با استفاده از سرویس گیرنده kfp ارسال کرد.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

اکنون می‌توانید برای مشاهده پیشرفت به «Vertex AI > Pipelines» در Google Cloud Console مراجعه کنید.